This is an automated email from the ASF dual-hosted git repository. sidmishra pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new a444e3a ATLAS-4424: Enhanced the Import hive utility to create export zip files and run bulk import a444e3a is described below commit a444e3aa7b607e64e8b928b32b89eb14f439c94c Author: Sidharth Mishra <sidharthkmis...@gmail.com> AuthorDate: Wed Nov 3 16:43:35 2021 -0700 ATLAS-4424: Enhanced the Import hive utility to create export zip files and run bulk import Signed-off-by: Sidharth Mishra <sidmis...@apache.org> (cherry picked from commit 041e7d737efbe243ebeb525ae4af1941b1008ee2) --- addons/hive-bridge/src/bin/import-hive.sh | 18 +- .../atlas/hive/bridge/HiveMetaStoreBridge.java | 227 +++-- .../atlas/hive/bridge/HiveMetaStoreBridgeV2.java | 1036 ++++++++++++++++++++ .../atlas/model/impexp/AtlasImportRequest.java | 5 +- .../model/migration/MigrationImportStatus.java | 29 +- .../migration/DataMigrationStatusService.java | 33 +- .../migration/ZipFileMigrationImporter.java | 10 +- .../impexp/DataMigrationStatusServiceTest.java | 10 +- .../apache/atlas/web/resources/AdminResource.java | 10 +- 9 files changed, 1265 insertions(+), 113 deletions(-) diff --git a/addons/hive-bridge/src/bin/import-hive.sh b/addons/hive-bridge/src/bin/import-hive.sh index c353937..ebe6976 100755 --- a/addons/hive-bridge/src/bin/import-hive.sh +++ b/addons/hive-bridge/src/bin/import-hive.sh @@ -140,24 +140,34 @@ do -d) IMPORT_ARGS="$IMPORT_ARGS -d $1"; shift;; -t) IMPORT_ARGS="$IMPORT_ARGS -t $1"; shift;; -f) IMPORT_ARGS="$IMPORT_ARGS -f $1"; shift;; + -o) IMPORT_ARGS="$IMPORT_ARGS -o $1"; shift;; + -i) IMPORT_ARGS="$IMPORT_ARGS -i";; + -h) export HELP_OPTION="true"; IMPORT_ARGS="$IMPORT_ARGS -h";; --database) IMPORT_ARGS="$IMPORT_ARGS --database $1"; shift;; --table) IMPORT_ARGS="$IMPORT_ARGS --table $1"; shift;; --filename) IMPORT_ARGS="$IMPORT_ARGS --filename $1"; shift;; + --output) IMPORT_ARGS="$IMPORT_ARGS --output $1"; shift;; + --ignoreBulkImport) IMPORT_ARGS="$IMPORT_ARGS --ignoreBulkImport";; + --help) export HELP_OPTION="true"; IMPORT_ARGS="$IMPORT_ARGS --help";; -deleteNonExisting) IMPORT_ARGS="$IMPORT_ARGS -deleteNonExisting";; "") break;; - *) JVM_ARGS="$JVM_ARGS $option" + *) IMPORT_ARGS="$IMPORT_ARGS $option" esac done JAVA_PROPERTIES="${JAVA_PROPERTIES} ${JVM_ARGS}" -echo "Log file for import is $LOGFILE" +if [ -z ${HELP_OPTION} ]; then + echo "Log file for import is $LOGFILE" +fi "${JAVA_BIN}" ${JAVA_PROPERTIES} -cp "${CP}" org.apache.atlas.hive.bridge.HiveMetaStoreBridge $IMPORT_ARGS RETVAL=$? -[ $RETVAL -eq 0 ] && echo Hive metadata imported successfully! -[ $RETVAL -ne 0 ] && echo Failed to import Hive metadata! Check logs at: $LOGFILE for details. +if [ -z ${HELP_OPTION} ]; then + [ $RETVAL -eq 0 ] && echo Hive Meta Data imported successfully! + [ $RETVAL -eq 1 ] && echo Failed to import Hive Meta Data! Check logs at: $LOGFILE for details. +fi exit $RETVAL diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java index c361ac6..28365bc 100755 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java @@ -20,7 +20,6 @@ package org.apache.atlas.hive.bridge; import com.google.common.annotations.VisibleForTesting; import com.sun.jersey.api.client.ClientResponse; -import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasTypeUtil; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasClientV2; @@ -43,12 +42,14 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.AtlasStruct; import org.apache.atlas.utils.PathExtractorContext; -import org.apache.commons.cli.ParseException; -import org.apache.commons.collections.CollectionUtils; import org.apache.commons.cli.BasicParser; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.MissingArgumentException; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.ArrayUtils; @@ -105,10 +106,27 @@ public class HiveMetaStoreBridge { public static final String HIVE_TABLE_DB_EDGE_LABEL = "__hive_table.db"; public static final String HOOK_HIVE_PAGE_LIMIT = CONF_PREFIX + "page.limit"; + static final String OPTION_OUTPUT_FILEPATH_SHORT = "o"; + static final String OPTION_OUTPUT_FILEPATH_LONG = "output"; + static final String OPTION_IGNORE_BULK_IMPORT_SHORT = "i"; + static final String OPTION_IGNORE_BULK_IMPORT_LONG = "ignoreBulkImport"; + static final String OPTION_DATABASE_SHORT = "d"; + static final String OPTION_DATABASE_LONG = "database"; + static final String OPTION_TABLE_SHORT = "t"; + static final String OPTION_TABLE_LONG = "table"; + static final String OPTION_IMPORT_DATA_FILE_SHORT = "f"; + static final String OPTION_IMPORT_DATA_FILE_LONG = "filename"; + static final String OPTION_FAIL_ON_ERROR = "failOnError"; + static final String OPTION_DELETE_NON_EXISTING = "deleteNonExisting"; + static final String OPTION_HELP_SHORT = "h"; + static final String OPTION_HELP_LONG = "help"; + public static final String HOOK_AWS_S3_ATLAS_MODEL_VERSION_V2 = "v2"; - private static final int EXIT_CODE_SUCCESS = 0; - private static final int EXIT_CODE_FAILED = 1; + private static final int EXIT_CODE_SUCCESS = 0; + private static final int EXIT_CODE_FAILED = 1; + private static final int EXIT_CODE_INVALID_ARG = 2; + private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/"; private static int pageLimit = 10000; @@ -122,84 +140,63 @@ public class HiveMetaStoreBridge { public static void main(String[] args) { int exitCode = EXIT_CODE_FAILED; AtlasClientV2 atlasClientV2 = null; + Options acceptedCliOptions = prepareCommandLineOptions(); try { - Options options = new Options(); - options.addOption("d", "database", true, "Database name"); - options.addOption("t", "table", true, "Table name"); - options.addOption("f", "filename", true, "Filename"); - options.addOption("failOnError", false, "failOnError"); - options.addOption("deleteNonExisting", false, "Delete database and table entities in Atlas if not present in Hive"); - - CommandLine cmd = new BasicParser().parse(options, args); - boolean failOnError = cmd.hasOption("failOnError"); - boolean deleteNonExisting = cmd.hasOption("deleteNonExisting"); - LOG.info("delete non existing flag : {} ", deleteNonExisting); - - String databaseToImport = cmd.getOptionValue("d"); - String tableToImport = cmd.getOptionValue("t"); - String fileToImport = cmd.getOptionValue("f"); - Configuration atlasConf = ApplicationProperties.get(); - String[] atlasEndpoint = atlasConf.getStringArray(ATLAS_ENDPOINT); - - if (atlasEndpoint == null || atlasEndpoint.length == 0) { - atlasEndpoint = new String[] { DEFAULT_ATLAS_URL }; - } + CommandLine cmd = new BasicParser().parse(acceptedCliOptions, args); + List<String> argsNotProcessed = cmd.getArgList(); + if (argsNotProcessed != null && argsNotProcessed.size() > 0) { + throw new ParseException("Unrecognized arguments."); + } - if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) { - String[] basicAuthUsernamePassword = AuthenticationUtil.getBasicAuthenticationInput(); - - atlasClientV2 = new AtlasClientV2(atlasEndpoint, basicAuthUsernamePassword); + if (cmd.hasOption(OPTION_HELP_SHORT)) { + printUsage(acceptedCliOptions); + exitCode = EXIT_CODE_SUCCESS; } else { - UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + Configuration atlasConf = ApplicationProperties.get(); + String[] atlasEndpoint = atlasConf.getStringArray(ATLAS_ENDPOINT); - atlasClientV2 = new AtlasClientV2(ugi, ugi.getShortUserName(), atlasEndpoint); - } - - HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(atlasConf, new HiveConf(), atlasClientV2); + if (atlasEndpoint == null || atlasEndpoint.length == 0) { + atlasEndpoint = new String[] { DEFAULT_ATLAS_URL }; + } - if (deleteNonExisting) { - hiveMetaStoreBridge.deleteEntitiesForNonExistingHiveMetadata(failOnError); - exitCode = EXIT_CODE_SUCCESS; - } else if (StringUtils.isNotEmpty(fileToImport)) { - File f = new File(fileToImport); + if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) { + String[] basicAuthUsernamePassword = AuthenticationUtil.getBasicAuthenticationInput(); - if (f.exists() && f.canRead()) { - BufferedReader br = new BufferedReader(new FileReader(f)); - String line = null; + atlasClientV2 = new AtlasClientV2(atlasEndpoint, basicAuthUsernamePassword); + } else { + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); - while((line = br.readLine()) != null) { - String val[] = line.split(":"); + atlasClientV2 = new AtlasClientV2(ugi, ugi.getShortUserName(), atlasEndpoint); + } - if (ArrayUtils.isNotEmpty(val)) { - databaseToImport = val[0]; + boolean createZip = cmd.hasOption(OPTION_OUTPUT_FILEPATH_LONG); - if (val.length > 1) { - tableToImport = val[1]; - } else { - tableToImport = ""; - } + if (createZip) { + HiveMetaStoreBridgeV2 hiveMetaStoreBridgeV2 = new HiveMetaStoreBridgeV2(atlasConf, new HiveConf(), atlasClientV2); - hiveMetaStoreBridge.importHiveMetadata(databaseToImport, tableToImport, failOnError); - } + if (hiveMetaStoreBridgeV2.exportDataToZipAndRunAtlasImport(cmd)) { + exitCode = EXIT_CODE_SUCCESS; } - - exitCode = EXIT_CODE_SUCCESS; } else { - LOG.error("Failed to read the input file: " + fileToImport); - exitCode = EXIT_CODE_FAILED; + HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(atlasConf, new HiveConf(), atlasClientV2); + + if (hiveMetaStoreBridge.importDataDirectlyToAtlas(cmd)) { + exitCode = EXIT_CODE_SUCCESS; + } } - } else { - hiveMetaStoreBridge.importHiveMetadata(databaseToImport, tableToImport, failOnError); - exitCode = EXIT_CODE_SUCCESS; } - } catch(ParseException e) { - LOG.error("Failed to parse arguments. Error: ", e.getMessage()); - printUsage(); + LOG.error("Invalid argument. Error: {}", e.getMessage()); + System.out.println("Invalid argument. Error: " + e.getMessage()); + exitCode = EXIT_CODE_INVALID_ARG; + + if (!(e instanceof MissingArgumentException)) { + printUsage(acceptedCliOptions); + } } catch(Exception e) { - LOG.error("Import failed", e); + LOG.error("Import Failed", e); } finally { if( atlasClientV2 !=null) { atlasClientV2.close(); @@ -209,26 +206,48 @@ public class HiveMetaStoreBridge { System.exit(exitCode); } - private static void printUsage() { + private static Options prepareCommandLineOptions() { + Options acceptedCliOptions = new Options(); + + return acceptedCliOptions.addOption(OPTION_OUTPUT_FILEPATH_SHORT, OPTION_OUTPUT_FILEPATH_LONG, true, "Output path or file for Zip import") + .addOption(OPTION_IGNORE_BULK_IMPORT_SHORT, OPTION_IGNORE_BULK_IMPORT_LONG, false, "Ignore bulk Import for Zip import") + .addOption(OPTION_DATABASE_SHORT, OPTION_DATABASE_LONG, true, "Database name") + .addOption(OPTION_TABLE_SHORT, OPTION_TABLE_LONG, true, "Table name") + .addOption(OPTION_IMPORT_DATA_FILE_SHORT, OPTION_IMPORT_DATA_FILE_LONG, true, "Filename") + .addOption(OPTION_FAIL_ON_ERROR, false, "failOnError") + .addOption(OPTION_DELETE_NON_EXISTING, false, "Delete database and table entities in Atlas if not present in Hive") + .addOption(OPTION_HELP_SHORT, OPTION_HELP_LONG, false, "Print this help message"); + } + + private static void printUsage(Options options) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("import-hive.sh", options); + System.out.println(); + System.out.println("Usage options:"); + System.out.println(" Usage 1: import-hive.sh [-d <database> OR --database <database>] " ); + System.out.println(" Imports specified database and its tables ..."); + System.out.println(); + System.out.println(" Usage 2: import-hive.sh [-d <database> OR --database <database>] [-t <table> OR --table <table>]"); + System.out.println(" Imports specified table within that database ..."); System.out.println(); + System.out.println(" Usage 3: import-hive.sh"); + System.out.println(" Imports all databases and tables..."); System.out.println(); - System.out.println("Usage 1: import-hive.sh [-d <database> OR --database <database>] " ); - System.out.println(" Imports specified database and its tables ..."); + System.out.println(" Usage 4: import-hive.sh -f <filename>"); + System.out.println(" Imports all databases and tables in the file..."); + System.out.println(" Format:"); + System.out.println(" database1:tbl1"); + System.out.println(" database1:tbl2"); + System.out.println(" database2:tbl2"); System.out.println(); - System.out.println("Usage 2: import-hive.sh [-d <database> OR --database <database>] [-t <table> OR --table <table>]"); - System.out.println(" Imports specified table within that database ..."); + System.out.println(" Usage 5: import-hive.sh [-deleteNonExisting] " ); + System.out.println(" Deletes databases and tables which are not in Hive ..."); System.out.println(); - System.out.println("Usage 3: import-hive.sh"); - System.out.println(" Imports all databases and tables..."); + System.out.println(" Usage 6: import-hive.sh -o <output Path or file> [-f <filename>] [-d <database> OR --database <database>] [-t <table> OR --table <table>]"); + System.out.println(" To create zip file with exported data and import the zip file at Atlas ..."); System.out.println(); - System.out.println("Usage 4: import-hive.sh -f <filename>"); - System.out.println(" Imports all databases and tables in the file..."); - System.out.println(" Format:"); - System.out.println(" database1:tbl1"); - System.out.println(" database1:tbl2"); - System.out.println(" database2:tbl2"); - System.out.println("Usage 5: import-hive.sh [-deleteNonExisting] " ); - System.out.println(" Deletes databases and tables which are not in Hive ..."); + System.out.println(" Usage 7: import-hive.sh -i -o <output Path or file> [-f <filename>] [-d <database> OR --database <database>] [-t <table> OR --table <table>]"); + System.out.println(" To create zip file with exported data without importing to Atlas which can be imported later ..."); System.out.println(); } @@ -286,6 +305,54 @@ public class HiveMetaStoreBridge { return convertHdfsPathToLowerCase; } + public boolean importDataDirectlyToAtlas(CommandLine cmd) throws Exception { + LOG.info("Importing Hive metadata"); + boolean ret = false; + + String databaseToImport = cmd.getOptionValue(OPTION_DATABASE_SHORT); + String tableToImport = cmd.getOptionValue(OPTION_TABLE_SHORT); + String fileToImport = cmd.getOptionValue(OPTION_IMPORT_DATA_FILE_SHORT); + + boolean failOnError = cmd.hasOption(OPTION_FAIL_ON_ERROR); + boolean deleteNonExisting = cmd.hasOption(OPTION_DELETE_NON_EXISTING); + + LOG.info("delete non existing flag : {} ", deleteNonExisting); + + if (deleteNonExisting) { + deleteEntitiesForNonExistingHiveMetadata(failOnError); + ret = true; + } else if (StringUtils.isNotEmpty(fileToImport)) { + File f = new File(fileToImport); + + if (f.exists() && f.canRead()) { + BufferedReader br = new BufferedReader(new FileReader(f)); + String line = null; + + while((line = br.readLine()) != null) { + String val[] = line.split(":"); + + if (ArrayUtils.isNotEmpty(val)) { + databaseToImport = val[0]; + + if (val.length > 1) { + tableToImport = val[1]; + } else { + tableToImport = ""; + } + + importDatabases(failOnError, databaseToImport, tableToImport); + } + } + ret = true; + } else { + LOG.error("Failed to read the input file: " + fileToImport); + } + } else { + importDatabases(failOnError, databaseToImport, tableToImport); + ret = true; + } + return ret; + } @VisibleForTesting public void importHiveMetadata(String databaseToImport, String tableToImport, boolean failOnError) throws Exception { diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeV2.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeV2.java new file mode 100644 index 0000000..0627c0e --- /dev/null +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeV2.java @@ -0,0 +1,1036 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.hive.bridge; + +import org.apache.atlas.AtlasClientV2; +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.impexp.AtlasImportRequest; +import org.apache.atlas.model.impexp.AtlasImportResult; +import org.apache.atlas.model.typedef.AtlasTypesDef; +import org.apache.atlas.type.AtlasType; +import org.apache.atlas.type.AtlasTypeUtil; +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.hive.hook.events.BaseHiveEvent; +import org.apache.atlas.hive.model.HiveDataTypes; +import org.apache.atlas.hook.AtlasHookException; +import org.apache.atlas.utils.AtlasPathExtractorUtil; +import org.apache.atlas.utils.HdfsNameServiceResolver; +import org.apache.atlas.utils.AtlasConfigurationUtil; +import org.apache.atlas.utils.PathExtractorContext; +import org.apache.atlas.utils.LruCache; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.model.instance.AtlasStruct; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; +import org.apache.commons.cli.MissingArgumentException; +import org.apache.commons.collections.CollectionUtils; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.ArrayUtils; +import org.apache.commons.lang.RandomStringUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Order; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.OutputStream; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.*; +import java.util.stream.Collectors; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; + +import static org.apache.atlas.hive.hook.events.BaseHiveEvent.*; + +/** + * A Bridge Utility that imports metadata into zip file from the Hive Meta Store + * which can be exported at Atlas + */ +public class HiveMetaStoreBridgeV2 { + private static final Logger LOG = LoggerFactory.getLogger(HiveMetaStoreBridgeV2.class); + + private static final String OPTION_DATABASE_SHORT = "d"; + private static final String OPTION_TABLE_SHORT = "t"; + private static final String OPTION_IMPORT_DATA_FILE_SHORT = "f"; + private static final String OPTION_OUTPUT_FILEPATH_SHORT = "o"; + private static final String OPTION_IGNORE_BULK_IMPORT_SHORT = "i"; + + public static final String CONF_PREFIX = "atlas.hook.hive."; + public static final String HDFS_PATH_CONVERT_TO_LOWER_CASE = CONF_PREFIX + "hdfs_path.convert_to_lowercase"; + public static final String HOOK_AWS_S3_ATLAS_MODEL_VERSION = CONF_PREFIX + "aws_s3.atlas.model.version"; + + public static final String CLUSTER_NAME_KEY = "atlas.cluster.name"; + public static final String HIVE_USERNAME = "atlas.hook.hive.default.username"; + public static final String HIVE_METADATA_NAMESPACE = "atlas.metadata.namespace"; + public static final String DEFAULT_CLUSTER_NAME = "primary"; + public static final String TEMP_TABLE_PREFIX = "_temp-"; + public static final String SEP = ":".intern(); + public static final String DEFAULT_METASTORE_CATALOG = "hive"; + public static final String HOOK_HIVE_PAGE_LIMIT = CONF_PREFIX + "page.limit"; + + private static final String HOOK_AWS_S3_ATLAS_MODEL_VERSION_V2 = "v2"; + private static final String ZIP_FILE_COMMENT_FORMAT = "{\"entitiesCount\":%d, \"total\":%d}"; + private static final int DEFAULT_PAGE_LIMIT = 10000; + private static final String DEFAULT_ZIP_FILE_NAME = "import-hive-output.zip"; + private static final String ZIP_ENTRY_ENTITIES = "entities.json"; + private static final String TYPES_DEF_JSON = "atlas-typesdef.json"; + + private static final String JSON_ARRAY_START = "["; + private static final String JSON_COMMA = ","; + private static final String JSON_EMPTY_OBJECT = "{}"; + private static final String JSON_ARRAY_END = "]"; + + private static int pageLimit = DEFAULT_PAGE_LIMIT; + private String awsS3AtlasModelVersion = null; + + private final String metadataNamespace; + private final Hive hiveClient; + private final AtlasClientV2 atlasClientV2; + private final boolean convertHdfsPathToLowerCase; + + private ZipOutputStream zipOutputStream; + private String outZipFileName; + private int totalProcessedEntities = 0; + + private final Map<String, AtlasEntityWithExtInfo> entityLRUCache = new LruCache<>(10000, 0); + private final Map<Table, AtlasEntity> hiveTablesAndAtlasEntity = new HashMap<>(); + private final Map<String, AtlasEntity> dbEntities = new HashMap<>(); + private final List<Map<String, String>> databaseAndTableListToImport = new ArrayList<>(); + private final Map<String, String> qualifiedNameGuidMap = new HashMap<>(); + + /** + * Construct a HiveMetaStoreBridgeV2. + * @param hiveConf {@link HiveConf} for Hive component in the cluster + */ + public HiveMetaStoreBridgeV2(Configuration atlasProperties, HiveConf hiveConf, AtlasClientV2 atlasClientV2) throws Exception { + this.metadataNamespace = getMetadataNamespace(atlasProperties); + this.hiveClient = Hive.get(hiveConf); + this.atlasClientV2 = atlasClientV2; + this.convertHdfsPathToLowerCase = atlasProperties.getBoolean(HDFS_PATH_CONVERT_TO_LOWER_CASE, false); + this.awsS3AtlasModelVersion = atlasProperties.getString(HOOK_AWS_S3_ATLAS_MODEL_VERSION, HOOK_AWS_S3_ATLAS_MODEL_VERSION_V2); + + if (atlasProperties != null) { + pageLimit = atlasProperties.getInteger(HOOK_HIVE_PAGE_LIMIT, DEFAULT_PAGE_LIMIT); + } + } + + public boolean exportDataToZipAndRunAtlasImport(CommandLine cmd) throws MissingArgumentException, IOException, HiveException, AtlasBaseException { + boolean ret = true; + boolean failOnError = cmd.hasOption("failOnError"); + + String databaseToImport = cmd.getOptionValue(OPTION_DATABASE_SHORT); + String tableToImport = cmd.getOptionValue(OPTION_TABLE_SHORT); + String importDataFile = cmd.getOptionValue(OPTION_IMPORT_DATA_FILE_SHORT); + String outputFileOrPath = cmd.getOptionValue(OPTION_OUTPUT_FILEPATH_SHORT); + + boolean ignoreBulkImport = cmd.hasOption(OPTION_IGNORE_BULK_IMPORT_SHORT); + + validateOutputFileOrPath(outputFileOrPath); + + try { + initializeZipStream(); + + if (isValidImportDataFile(importDataFile)) { + File f = new File(importDataFile); + + BufferedReader br = new BufferedReader(new FileReader(f)); + String line = null; + + while ((line = br.readLine()) != null) { + String val[] = line.split(":"); + + if (ArrayUtils.isNotEmpty(val)) { + databaseToImport = val[0]; + + if (val.length > 1) { + tableToImport = val[1]; + } else { + tableToImport = ""; + } + + importHiveDatabases(databaseToImport, tableToImport, failOnError); + } + } + } else { + importHiveDatabases(databaseToImport, tableToImport, failOnError); + } + + importHiveTables(failOnError); + importHiveColumns(failOnError); + } finally { + endWritingAndZipStream(); + } + + if (!ignoreBulkImport) { + runAtlasImport(); + } + + return ret; + } + + private void validateOutputFileOrPath(String outputFileOrPath) throws MissingArgumentException { + if (StringUtils.isBlank(outputFileOrPath)) { + throw new MissingArgumentException("Output Path/File can't be empty"); + } + + File fileOrDirToImport = new File(outputFileOrPath); + if (fileOrDirToImport.exists()) { + if (fileOrDirToImport.isDirectory()) { + this.outZipFileName = outputFileOrPath + File.separator + DEFAULT_ZIP_FILE_NAME; + LOG.info("The default output zip file {} will be created at {}", DEFAULT_ZIP_FILE_NAME, outputFileOrPath); + } else { + throw new MissingArgumentException("output file: " + outputFileOrPath + " already present"); + } + } else if (fileOrDirToImport.getParentFile().isDirectory() && outputFileOrPath.endsWith(".zip")) { + LOG.info("The mentioned output zip file {} will be created", outputFileOrPath); + this.outZipFileName = outputFileOrPath; + } else { + throw new MissingArgumentException("Invalid File/Path"); + } + } + + private boolean isValidImportDataFile(String importDataFile) throws MissingArgumentException { + boolean ret = false; + if (StringUtils.isNotBlank(importDataFile)) { + File dataFile = new File(importDataFile); + + if (!dataFile.exists() || !dataFile.canRead()) { + throw new MissingArgumentException("Invalid import data file"); + } + ret = true; + } + + return ret; + } + + private void initializeZipStream() throws IOException, AtlasBaseException { + this.zipOutputStream = new ZipOutputStream(getOutputStream(this.outZipFileName)); + + storeTypesDefToZip(new AtlasTypesDef()); + + startWritingEntitiesToZip(); + } + + private void storeTypesDefToZip(AtlasTypesDef typesDef) throws AtlasBaseException { + String jsonData = AtlasType.toJson(typesDef); + saveToZip(TYPES_DEF_JSON, jsonData); + } + + private void saveToZip(String fileName, String jsonData) throws AtlasBaseException { + try { + ZipEntry e = new ZipEntry(fileName); + zipOutputStream.putNextEntry(e); + writeBytes(jsonData); + zipOutputStream.closeEntry(); + } catch (IOException e) { + throw new AtlasBaseException(String.format("Error writing file %s.", fileName), e); + } + } + + private void startWritingEntitiesToZip() throws IOException { + zipOutputStream.putNextEntry(new ZipEntry(ZIP_ENTRY_ENTITIES)); + writeBytes(JSON_ARRAY_START); + } + + private String getDatabaseToImport(String TableWithDatabase) { + String ret = null; + String val[] = TableWithDatabase.split("\\."); + if (val.length > 1) { + ret = val[0]; + } + return ret; + } + + private String getTableToImport(String TableWithDatabase) { + String ret = null; + String val[] = TableWithDatabase.split("\\."); + if (val.length > 1) { + ret = val[1]; + } + return ret; + } + + private void importHiveDatabases(String databaseToImport, String tableWithDatabaseToImport, boolean failOnError) throws HiveException, AtlasBaseException { + LOG.info("Importing Hive Databases"); + + List<String> databaseNames = null; + + if (StringUtils.isEmpty(databaseToImport) && StringUtils.isNotEmpty(tableWithDatabaseToImport)) { + if (isTableWithDatabaseName(tableWithDatabaseToImport)) { + databaseToImport = getDatabaseToImport(tableWithDatabaseToImport); + tableWithDatabaseToImport = getTableToImport(tableWithDatabaseToImport); + } + } + + if (StringUtils.isEmpty(databaseToImport)) { + //when database to import is empty, import all + databaseNames = hiveClient.getAllDatabases(); + } else { + //when database to import has some value then, import that db and all table under it. + databaseNames = hiveClient.getDatabasesByPattern(databaseToImport); + } + + if (!CollectionUtils.isEmpty(databaseNames)) { + LOG.info("Found {} databases", databaseNames.size()); + for (String databaseName : databaseNames) { + try { + if (!dbEntities.containsKey(databaseName)) { + LOG.info("Importing Hive Database {}", databaseName); + AtlasEntityWithExtInfo dbEntity = writeDatabase(databaseName); + if (dbEntity != null) { + dbEntities.put(databaseName, dbEntity.getEntity()); + } + } + databaseAndTableListToImport.add(Collections.singletonMap(databaseName, tableWithDatabaseToImport)); + } catch (IOException e) { + LOG.error("Import failed for hive database {}", databaseName, e); + + if (failOnError) { + throw new AtlasBaseException(e.getMessage(), e); + } + } + } + } else { + LOG.error("No database found"); + if (failOnError) { + throw new AtlasBaseException("No database found"); + } + } + } + + private void writeEntity(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) throws IOException { + if (MapUtils.isNotEmpty(entityWithExtInfo.getReferredEntities())) { + Iterator<Map.Entry<String, AtlasEntity>> itr = entityWithExtInfo.getReferredEntities().entrySet().iterator(); + while (itr.hasNext()) { + Map.Entry<String, AtlasEntity> eachEntity = itr.next(); + if (eachEntity.getValue().getTypeName().equalsIgnoreCase(HiveDataTypes.HIVE_DB.getName())) { + itr.remove(); + } + } + } + + if (!entityLRUCache.containsKey(entityWithExtInfo.getEntity().getGuid())) { + entityLRUCache.put(entityWithExtInfo.getEntity().getGuid(), entityWithExtInfo); + writeBytes(AtlasType.toJson(entityWithExtInfo) + JSON_COMMA); + } + totalProcessedEntities++; + } + + private void endWritingAndZipStream() throws IOException { + writeBytes(JSON_EMPTY_OBJECT); + writeBytes(JSON_ARRAY_END); + setStreamSize(totalProcessedEntities); + close(); + } + + private void flush() { + try { + zipOutputStream.flush(); + } catch (IOException e) { + LOG.error("Error: Flush: ", e); + } + } + + private void close() throws IOException { + zipOutputStream.flush(); + zipOutputStream.closeEntry(); + zipOutputStream.close(); + } + + private void writeBytes(String payload) throws IOException { + zipOutputStream.write(payload.getBytes()); + } + + private OutputStream getOutputStream(String fileToWrite) throws IOException { + return FileUtils.openOutputStream(new File(fileToWrite)); + } + + public String getMetadataNamespace(Configuration config) { + return AtlasConfigurationUtil.getRecentString(config, HIVE_METADATA_NAMESPACE, getClusterName(config)); + } + + private String getClusterName(Configuration config) { + return config.getString(CLUSTER_NAME_KEY, DEFAULT_CLUSTER_NAME); + } + + public String getMetadataNamespace() { + return metadataNamespace; + } + + public boolean isConvertHdfsPathToLowerCase() { + return convertHdfsPathToLowerCase; + } + + /** + * Imports Hive tables if databaseAndTableListToImport is populated + * @param failOnError + * @throws Exception + */ + public void importHiveTables(boolean failOnError) throws HiveException, AtlasBaseException { + LOG.info("Importing Hive Tables"); + + int tablesImported = 0; + + if (CollectionUtils.isNotEmpty(databaseAndTableListToImport) && MapUtils.isNotEmpty(dbEntities)) { + for (Map<String, String> eachEntry : databaseAndTableListToImport) { + final List<Table> tableObjects; + + String databaseName = eachEntry.keySet().iterator().next(); + + if (StringUtils.isEmpty(eachEntry.values().iterator().next())) { + tableObjects = hiveClient.getAllTableObjects(databaseName); + + populateQualifiedNameGuidMap(HiveDataTypes.HIVE_DB.getName(), (String) dbEntities.get(databaseName).getAttribute(ATTRIBUTE_QUALIFIED_NAME)); + } else { + List<String> tableNames = hiveClient.getTablesByPattern(databaseName, eachEntry.values().iterator().next()); + tableObjects = new ArrayList<>(); + + for (String tableName : tableNames) { + Table table = hiveClient.getTable(databaseName, tableName); + tableObjects.add(table); + populateQualifiedNameGuidMap(HiveDataTypes.HIVE_TABLE.getName(), getTableQualifiedName(metadataNamespace, table)); + } + } + + if (!CollectionUtils.isEmpty(tableObjects)) { + LOG.info("Found {} tables to import in database {}", tableObjects.size(), databaseName); + + try { + for (Table table : tableObjects) { + int imported = importTable(dbEntities.get(databaseName), table, failOnError); + + tablesImported += imported; + } + } finally { + if (tablesImported == tableObjects.size()) { + LOG.info("Successfully imported {} tables from database {}", tablesImported, databaseName); + } else { + LOG.error("Imported {} of {} tables from database {}. Please check logs for errors during import", + tablesImported, tableObjects.size(), databaseName); + } + } + } else { + LOG.error("No tables to import in database {}", databaseName); + if (failOnError) { + throw new AtlasBaseException("No tables to import in database - " + databaseName); + } + } + } + } + + dbEntities.clear(); + } + + private void populateQualifiedNameGuidMap(String typeName, String qualifiedName) { + try { + AtlasEntitiesWithExtInfo entitiesWithExtInfo = atlasClientV2.getEntitiesByAttribute(typeName, Collections.singletonList(Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME, qualifiedName)), true, false); + + if (entitiesWithExtInfo != null && entitiesWithExtInfo.getEntities() != null) { + for (AtlasEntity entity : entitiesWithExtInfo.getEntities()) { + qualifiedNameGuidMap.put((String) entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), entity.getGuid()); + + for(Map.Entry<String, AtlasEntity> eachEntry : entitiesWithExtInfo.getReferredEntities().entrySet()) { + qualifiedNameGuidMap.put((String) eachEntry.getValue().getAttribute(ATTRIBUTE_QUALIFIED_NAME), eachEntry.getKey()); + } + + if (typeName.equals(HiveDataTypes.HIVE_DB.getName())) { + for (String eachRelatedGuid : getAllRelatedGuids(entity)) { + AtlasEntityWithExtInfo relatedEntity = atlasClientV2.getEntityByGuid(eachRelatedGuid, true, false); + + qualifiedNameGuidMap.put((String) relatedEntity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME), relatedEntity.getEntity().getGuid()); + for (Map.Entry<String, AtlasEntity> eachEntry : relatedEntity.getReferredEntities().entrySet()) { + qualifiedNameGuidMap.put((String) eachEntry.getValue().getAttribute(ATTRIBUTE_QUALIFIED_NAME), eachEntry.getKey()); + } + } + } + } + } + } catch (AtlasServiceException e) { + LOG.info("Unable to load the related entities for type {} and qualified name {} from Atlas", typeName, qualifiedName, e); + } + } + + private Set<String> getAllRelatedGuids(AtlasEntity entity) { + Set<String> relGuidsSet = new HashSet<>(); + + for (Object o : entity.getRelationshipAttributes().values()) { + if (o instanceof AtlasObjectId) { + relGuidsSet.add(((AtlasObjectId) o).getGuid()); + } else if (o instanceof List) { + for (Object id : (List) o) { + if (id instanceof AtlasObjectId) { + relGuidsSet.add(((AtlasObjectId) id).getGuid()); + } + if (id instanceof Map) { + relGuidsSet.add((String) ((Map) id).get("guid")); + } + } + } + } + + return relGuidsSet; + } + + public void importHiveColumns(boolean failOnError) throws AtlasBaseException { + LOG.info("Importing Hive Columns"); + + if (MapUtils.isEmpty(hiveTablesAndAtlasEntity)) { + if (LOG.isDebugEnabled()) { + LOG.debug("No hive table present to import columns"); + } + + return; + } + + for (Map.Entry<Table, AtlasEntity> eachTable : hiveTablesAndAtlasEntity.entrySet()) { + int columnsImported = 0; + List<AtlasEntity> columnEntities = new ArrayList<>(); + + try { + List<AtlasEntity> partKeys = toColumns(eachTable.getKey().getPartitionKeys(), eachTable.getValue(), RELATIONSHIP_HIVE_TABLE_PART_KEYS); + List<AtlasEntity> columns = toColumns(eachTable.getKey().getCols(), eachTable.getValue(), RELATIONSHIP_HIVE_TABLE_COLUMNS); + + partKeys.stream().collect(Collectors.toCollection(() -> columnEntities)); + columns.stream().collect(Collectors.toCollection(() -> columnEntities)); + + for (AtlasEntity eachColumnEntity : columnEntities) { + writeEntityToZip(new AtlasEntityWithExtInfo(eachColumnEntity)); + columnsImported++; + } + } catch (IOException e) { + LOG.error("Column Import failed for hive table {}", eachTable.getValue().getAttribute(ATTRIBUTE_QUALIFIED_NAME), e); + + if (failOnError) { + throw new AtlasBaseException(e.getMessage(), e); + } + } finally { + if (columnsImported == columnEntities.size()) { + LOG.info("Successfully imported {} columns for table {}", columnsImported, eachTable.getValue().getAttribute(ATTRIBUTE_QUALIFIED_NAME)); + } else { + LOG.error("Imported {} of {} columns for table {}. Please check logs for errors during import", columnsImported, columnEntities.size(), eachTable.getValue().getAttribute(ATTRIBUTE_QUALIFIED_NAME)); + } + } + } + + } + + private void runAtlasImport() { + AtlasImportRequest request = new AtlasImportRequest(); + request.setOption(AtlasImportRequest.UPDATE_TYPE_DEFINITION_KEY, "false"); + request.setOption(AtlasImportRequest.OPTION_KEY_FORMAT, AtlasImportRequest.OPTION_KEY_FORMAT_ZIP_DIRECT); + + try { + AtlasImportResult importResult = atlasClientV2.importData(request, this.outZipFileName); + + if (importResult.getOperationStatus() == AtlasImportResult.OperationStatus.SUCCESS) { + LOG.info("Successfully imported the zip file {} at Atlas and imported {} entities. Number of entities to be imported {}.", this.outZipFileName, importResult.getProcessedEntities().size(), totalProcessedEntities); + } else { + LOG.error("Failed to import or get the status of import for the zip file {} at Atlas. Number of entities to be imported {}.", this.outZipFileName, totalProcessedEntities); + } + } catch (AtlasServiceException e) { + LOG.error("Failed to import or get the status of import for the zip file {} at Atlas. Number of entities to be imported {}.", this.outZipFileName, totalProcessedEntities, e); + } + } + + public int importTable(AtlasEntity dbEntity, Table table, final boolean failOnError) throws AtlasBaseException { + try { + AtlasEntityWithExtInfo tableEntity = writeTable(dbEntity, table); + + hiveTablesAndAtlasEntity.put(table, tableEntity.getEntity()); + + if (table.getTableType() == TableType.EXTERNAL_TABLE) { + String processQualifiedName = getTableProcessQualifiedName(metadataNamespace, table); + String tableLocationString = isConvertHdfsPathToLowerCase() ? lower(table.getDataLocation().toString()) : table.getDataLocation().toString(); + Path location = table.getDataLocation(); + String query = getCreateTableString(table, tableLocationString); + + PathExtractorContext pathExtractorCtx = new PathExtractorContext(getMetadataNamespace(), isConvertHdfsPathToLowerCase(), awsS3AtlasModelVersion); + AtlasEntityWithExtInfo entityWithExtInfo = AtlasPathExtractorUtil.getPathEntity(location, pathExtractorCtx); + AtlasEntity pathInst = entityWithExtInfo.getEntity(); + AtlasEntity tableInst = tableEntity.getEntity(); + AtlasEntity processInst = new AtlasEntity(HiveDataTypes.HIVE_PROCESS.getName()); + + long now = System.currentTimeMillis(); + + processInst.setGuid(getGuid(processQualifiedName)); + processInst.setAttribute(ATTRIBUTE_QUALIFIED_NAME, processQualifiedName); + processInst.setAttribute(ATTRIBUTE_NAME, query); + processInst.setAttribute(ATTRIBUTE_CLUSTER_NAME, metadataNamespace); + processInst.setRelationshipAttribute(ATTRIBUTE_INPUTS, Collections.singletonList(AtlasTypeUtil.getAtlasRelatedObjectId(pathInst, RELATIONSHIP_DATASET_PROCESS_INPUTS))); + processInst.setRelationshipAttribute(ATTRIBUTE_OUTPUTS, Collections.singletonList(AtlasTypeUtil.getAtlasRelatedObjectId(tableInst, RELATIONSHIP_PROCESS_DATASET_OUTPUTS))); + String userName = table.getOwner(); + if (StringUtils.isEmpty(userName)) { + userName = ApplicationProperties.get().getString(HIVE_USERNAME, "hive"); + } + processInst.setAttribute(ATTRIBUTE_USER_NAME, userName); + processInst.setAttribute(ATTRIBUTE_START_TIME, now); + processInst.setAttribute(ATTRIBUTE_END_TIME, now); + processInst.setAttribute(ATTRIBUTE_OPERATION_TYPE, "CREATETABLE"); + processInst.setAttribute(ATTRIBUTE_QUERY_TEXT, query); + processInst.setAttribute(ATTRIBUTE_QUERY_ID, query); + processInst.setAttribute(ATTRIBUTE_QUERY_PLAN, "{}"); + processInst.setAttribute(ATTRIBUTE_RECENT_QUERIES, Collections.singletonList(query)); + + AtlasEntitiesWithExtInfo createTableProcess = new AtlasEntitiesWithExtInfo(); + + createTableProcess.addEntity(processInst); + + if (pathExtractorCtx.getKnownEntities() != null) { + pathExtractorCtx.getKnownEntities().values().forEach(entity -> createTableProcess.addEntity(entity)); + } else { + createTableProcess.addEntity(pathInst); + } + + writeEntitiesToZip(createTableProcess); + } + + return 1; + } catch (Exception e) { + LOG.error("Import failed for hive_table {}", table.getTableName(), e); + + if (failOnError) { + throw new AtlasBaseException(e.getMessage(), e); + } + + return 0; + } + } + + /** + * Write db entity + * @param databaseName + * @return + * @throws Exception + */ + private AtlasEntityWithExtInfo writeDatabase(String databaseName) throws HiveException, IOException { + AtlasEntityWithExtInfo ret = null; + Database db = hiveClient.getDatabase(databaseName); + + if (db != null) { + ret = new AtlasEntityWithExtInfo(toDbEntity(db)); + writeEntityToZip(ret); + } + + return ret; + } + + private AtlasEntityWithExtInfo writeTable(AtlasEntity dbEntity, Table table) throws AtlasHookException { + try { + AtlasEntityWithExtInfo tableEntity = toTableEntity(dbEntity, table); + writeEntityToZip(tableEntity); + + return tableEntity; + } catch (Exception e) { + throw new AtlasHookException("HiveMetaStoreBridgeV2.registerTable() failed.", e); + } + } + + /** + * Write an entity to Zip file + * @param entity + * @return + * @throws Exception + */ + private void writeEntityToZip(AtlasEntityWithExtInfo entity) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Writing {} entity: {}", entity.getEntity().getTypeName(), entity); + } + + writeEntity(entity); + clearRelationshipAttributes(entity.getEntity()); + flush(); + } + + /** + * Registers an entity in atlas + * @param entities + * @return + * @throws Exception + */ + private void writeEntitiesToZip(AtlasEntitiesWithExtInfo entities) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Writing {} entities: {}", entities.getEntities().size(), entities); + } + + for (AtlasEntity entity : entities.getEntities()) { + writeEntity(new AtlasEntityWithExtInfo(entity)); + } + + flush(); + clearRelationshipAttributes(entities); + } + + /** + * Create a Hive Database entity + * @param hiveDB The Hive {@link Database} object from which to map properties + * @return new Hive Database AtlasEntity + * @throws HiveException + */ + private AtlasEntity toDbEntity(Database hiveDB) { + return toDbEntity(hiveDB, null); + } + + private AtlasEntity toDbEntity(Database hiveDB, AtlasEntity dbEntity) { + if (dbEntity == null) { + dbEntity = new AtlasEntity(HiveDataTypes.HIVE_DB.getName()); + } + + String dbName = getDatabaseName(hiveDB); + + String qualifiedName = getDBQualifiedName(metadataNamespace, dbName); + dbEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, qualifiedName); + + dbEntity.setGuid(getGuid(true, qualifiedName)); + + dbEntity.setAttribute(ATTRIBUTE_NAME, dbName); + dbEntity.setAttribute(ATTRIBUTE_DESCRIPTION, hiveDB.getDescription()); + dbEntity.setAttribute(ATTRIBUTE_OWNER, hiveDB.getOwnerName()); + + dbEntity.setAttribute(ATTRIBUTE_CLUSTER_NAME, metadataNamespace); + dbEntity.setAttribute(ATTRIBUTE_LOCATION, HdfsNameServiceResolver.getPathWithNameServiceID(hiveDB.getLocationUri())); + dbEntity.setAttribute(ATTRIBUTE_PARAMETERS, hiveDB.getParameters()); + + if (hiveDB.getOwnerType() != null) { + dbEntity.setAttribute(ATTRIBUTE_OWNER_TYPE, OWNER_TYPE_TO_ENUM_VALUE.get(hiveDB.getOwnerType().getValue())); + } + + return dbEntity; + } + + private String getDBGuidFromAtlas(String dBQualifiedName) { + String guid = null; + try { + guid = atlasClientV2.getEntityHeaderByAttribute(HiveDataTypes.HIVE_DB.getName(), Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME, dBQualifiedName)).getGuid(); + } catch (AtlasServiceException e) { + LOG.warn("Failed to get DB guid from Atlas with qualified name {}", dBQualifiedName, e); + } + return guid; + } + + public static String getDatabaseName(Database hiveDB) { + String dbName = hiveDB.getName().toLowerCase(); + String catalogName = hiveDB.getCatalogName() != null ? hiveDB.getCatalogName().toLowerCase() : null; + + if (StringUtils.isNotEmpty(catalogName) && !StringUtils.equals(catalogName, DEFAULT_METASTORE_CATALOG)) { + dbName = catalogName + SEP + dbName; + } + + return dbName; + } + + /** + * Create a new table instance in Atlas + * @param database AtlasEntity for Hive {@link AtlasEntity} to which this table belongs + * @param hiveTable reference to the Hive {@link Table} from which to map properties + * @return Newly created Hive AtlasEntity + * @throws Exception + */ + private AtlasEntityWithExtInfo toTableEntity(AtlasEntity database, final Table hiveTable) throws AtlasHookException { + AtlasEntityWithExtInfo table = new AtlasEntityWithExtInfo(new AtlasEntity(HiveDataTypes.HIVE_TABLE.getName())); + + AtlasEntity tableEntity = table.getEntity(); + String tableQualifiedName = getTableQualifiedName(metadataNamespace, hiveTable); + long createTime = BaseHiveEvent.getTableCreateTime(hiveTable); + long lastAccessTime = hiveTable.getLastAccessTime() > 0 ? hiveTable.getLastAccessTime() : createTime; + + tableEntity.setGuid(getGuid(tableQualifiedName)); + tableEntity.setRelationshipAttribute(ATTRIBUTE_DB, AtlasTypeUtil.getAtlasRelatedObjectId(database, RELATIONSHIP_HIVE_TABLE_DB)); + tableEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, tableQualifiedName); + tableEntity.setAttribute(ATTRIBUTE_NAME, hiveTable.getTableName().toLowerCase()); + tableEntity.setAttribute(ATTRIBUTE_OWNER, hiveTable.getOwner()); + + tableEntity.setAttribute(ATTRIBUTE_CREATE_TIME, createTime); + tableEntity.setAttribute(ATTRIBUTE_LAST_ACCESS_TIME, lastAccessTime); + tableEntity.setAttribute(ATTRIBUTE_RETENTION, hiveTable.getRetention()); + tableEntity.setAttribute(ATTRIBUTE_PARAMETERS, hiveTable.getParameters()); + tableEntity.setAttribute(ATTRIBUTE_COMMENT, hiveTable.getParameters().get(ATTRIBUTE_COMMENT)); + tableEntity.setAttribute(ATTRIBUTE_TABLE_TYPE, hiveTable.getTableType().name()); + tableEntity.setAttribute(ATTRIBUTE_TEMPORARY, hiveTable.isTemporary()); + + if (hiveTable.getViewOriginalText() != null) { + tableEntity.setAttribute(ATTRIBUTE_VIEW_ORIGINAL_TEXT, hiveTable.getViewOriginalText()); + } + + if (hiveTable.getViewExpandedText() != null) { + tableEntity.setAttribute(ATTRIBUTE_VIEW_EXPANDED_TEXT, hiveTable.getViewExpandedText()); + } + + AtlasEntity sdEntity = toStorageDescEntity(hiveTable.getSd(), getStorageDescQFName(tableQualifiedName), AtlasTypeUtil.getObjectId(tableEntity)); + + tableEntity.setRelationshipAttribute(ATTRIBUTE_STORAGEDESC, AtlasTypeUtil.getAtlasRelatedObjectId(sdEntity, RELATIONSHIP_HIVE_TABLE_STORAGE_DESC)); + + table.addReferredEntity(database); + table.addReferredEntity(sdEntity); + table.setEntity(tableEntity); + + return table; + } + + private AtlasEntity toStorageDescEntity(StorageDescriptor storageDesc, String sdQualifiedName, AtlasObjectId tableId) { + AtlasEntity ret = new AtlasEntity(HiveDataTypes.HIVE_STORAGEDESC.getName()); + + ret.setGuid(getGuid(sdQualifiedName)); + ret.setRelationshipAttribute(ATTRIBUTE_TABLE, AtlasTypeUtil.getAtlasRelatedObjectId(tableId, RELATIONSHIP_HIVE_TABLE_STORAGE_DESC)); + ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, sdQualifiedName); + ret.setAttribute(ATTRIBUTE_PARAMETERS, storageDesc.getParameters()); + ret.setAttribute(ATTRIBUTE_LOCATION, HdfsNameServiceResolver.getPathWithNameServiceID(storageDesc.getLocation())); + ret.setAttribute(ATTRIBUTE_INPUT_FORMAT, storageDesc.getInputFormat()); + ret.setAttribute(ATTRIBUTE_OUTPUT_FORMAT, storageDesc.getOutputFormat()); + ret.setAttribute(ATTRIBUTE_COMPRESSED, storageDesc.isCompressed()); + ret.setAttribute(ATTRIBUTE_NUM_BUCKETS, storageDesc.getNumBuckets()); + ret.setAttribute(ATTRIBUTE_STORED_AS_SUB_DIRECTORIES, storageDesc.isStoredAsSubDirectories()); + + if (storageDesc.getBucketCols().size() > 0) { + ret.setAttribute(ATTRIBUTE_BUCKET_COLS, storageDesc.getBucketCols()); + } + + if (storageDesc.getSerdeInfo() != null) { + SerDeInfo serdeInfo = storageDesc.getSerdeInfo(); + + LOG.info("serdeInfo = {}", serdeInfo); + AtlasStruct serdeInfoStruct = new AtlasStruct(HiveDataTypes.HIVE_SERDE.getName()); + + serdeInfoStruct.setAttribute(ATTRIBUTE_NAME, serdeInfo.getName()); + serdeInfoStruct.setAttribute(ATTRIBUTE_SERIALIZATION_LIB, serdeInfo.getSerializationLib()); + serdeInfoStruct.setAttribute(ATTRIBUTE_PARAMETERS, serdeInfo.getParameters()); + + ret.setAttribute(ATTRIBUTE_SERDE_INFO, serdeInfoStruct); + } + + if (CollectionUtils.isNotEmpty(storageDesc.getSortCols())) { + List<AtlasStruct> sortColsStruct = new ArrayList<>(); + + for (Order sortcol : storageDesc.getSortCols()) { + String hiveOrderName = HiveDataTypes.HIVE_ORDER.getName(); + AtlasStruct colStruct = new AtlasStruct(hiveOrderName); + colStruct.setAttribute("col", sortcol.getCol()); + colStruct.setAttribute("order", sortcol.getOrder()); + + sortColsStruct.add(colStruct); + } + + ret.setAttribute(ATTRIBUTE_SORT_COLS, sortColsStruct); + } + + return ret; + } + + private List<AtlasEntity> toColumns(List<FieldSchema> schemaList, AtlasEntity table, String relationshipType) { + List<AtlasEntity> ret = new ArrayList<>(); + + int columnPosition = 0; + for (FieldSchema fs : schemaList) { + LOG.debug("Processing field {}", fs); + + AtlasEntity column = new AtlasEntity(HiveDataTypes.HIVE_COLUMN.getName()); + + String columnQualifiedName = getColumnQualifiedName((String) table.getAttribute(ATTRIBUTE_QUALIFIED_NAME), fs.getName()); + + column.setAttribute(ATTRIBUTE_QUALIFIED_NAME, columnQualifiedName); + column.setGuid(getGuid(columnQualifiedName)); + + column.setRelationshipAttribute(ATTRIBUTE_TABLE, AtlasTypeUtil.getAtlasRelatedObjectId(table, relationshipType)); + + column.setAttribute(ATTRIBUTE_NAME, fs.getName()); + column.setAttribute(ATTRIBUTE_OWNER, table.getAttribute(ATTRIBUTE_OWNER)); + column.setAttribute(ATTRIBUTE_COL_TYPE, fs.getType()); + column.setAttribute(ATTRIBUTE_COL_POSITION, columnPosition++); + column.setAttribute(ATTRIBUTE_COMMENT, fs.getComment()); + + ret.add(column); + } + return ret; + } + + private String getCreateTableString(Table table, String location){ + String colString = ""; + List<FieldSchema> colList = table.getAllCols(); + + if (colList != null) { + for (FieldSchema col : colList) { + colString += col.getName() + " " + col.getType() + ","; + } + + if (colList.size() > 0) { + colString = colString.substring(0, colString.length() - 1); + colString = "(" + colString + ")"; + } + } + + String query = "create external table " + table.getTableName() + colString + " location '" + location + "'"; + + return query; + } + + private String lower(String str) { + if (StringUtils.isEmpty(str)) { + return ""; + } + + return str.toLowerCase().trim(); + } + + /** + * Construct the qualified name used to uniquely identify a Table instance in Atlas. + * @param metadataNamespace Metadata namespace of the cluster to which the Hive component belongs + * @param table hive table for which the qualified name is needed + * @return Unique qualified name to identify the Table instance in Atlas. + */ + private static String getTableQualifiedName(String metadataNamespace, Table table) { + return getTableQualifiedName(metadataNamespace, table.getDbName(), table.getTableName(), table.isTemporary()); + } + + /** + * Construct the qualified name used to uniquely identify a Database instance in Atlas. + * @param metadataNamespace Name of the cluster to which the Hive component belongs + * @param dbName Name of the Hive database + * @return Unique qualified name to identify the Database instance in Atlas. + */ + public static String getDBQualifiedName(String metadataNamespace, String dbName) { + return String.format("%s@%s", dbName.toLowerCase(), metadataNamespace); + } + + /** + * Construct the qualified name used to uniquely identify a Table instance in Atlas. + * @param metadataNamespace Name of the cluster to which the Hive component belongs + * @param dbName Name of the Hive database to which the Table belongs + * @param tableName Name of the Hive table + * @param isTemporaryTable is this a temporary table + * @return Unique qualified name to identify the Table instance in Atlas. + */ + public static String getTableQualifiedName(String metadataNamespace, String dbName, String tableName, boolean isTemporaryTable) { + String tableTempName = tableName; + + if (isTemporaryTable) { + if (SessionState.get() != null && SessionState.get().getSessionId() != null) { + tableTempName = tableName + TEMP_TABLE_PREFIX + SessionState.get().getSessionId(); + } else { + tableTempName = tableName + TEMP_TABLE_PREFIX + RandomStringUtils.random(10); + } + } + + return String.format("%s.%s@%s", dbName.toLowerCase(), tableTempName.toLowerCase(), metadataNamespace); + } + + public static String getTableProcessQualifiedName(String metadataNamespace, Table table) { + String tableQualifiedName = getTableQualifiedName(metadataNamespace, table); + long createdTime = getTableCreatedTime(table); + + return tableQualifiedName + SEP + createdTime; + } + + public static String getStorageDescQFName(String tableQualifiedName) { + return tableQualifiedName + "_storage"; + } + + public static String getColumnQualifiedName(final String tableQualifiedName, final String colName) { + final String[] parts = tableQualifiedName.split("@"); + final String tableName = parts[0]; + final String metadataNamespace = parts[1]; + + return String.format("%s.%s@%s", tableName, colName.toLowerCase(), metadataNamespace); + } + + public static long getTableCreatedTime(Table table) { + return table.getTTable().getCreateTime() * MILLIS_CONVERT_FACTOR; + } + + private void clearRelationshipAttributes(AtlasEntitiesWithExtInfo entities) { + if (entities != null) { + if (entities.getEntities() != null) { + for (AtlasEntity entity : entities.getEntities()) { + clearRelationshipAttributes(entity);; + } + } + + if (entities.getReferredEntities() != null) { + clearRelationshipAttributes(entities.getReferredEntities().values()); + } + } + } + + private void clearRelationshipAttributes(Collection<AtlasEntity> entities) { + if (entities != null) { + for (AtlasEntity entity : entities) { + clearRelationshipAttributes(entity); + } + } + } + + private void clearRelationshipAttributes(AtlasEntity entity) { + if (entity != null && entity.getRelationshipAttributes() != null) { + entity.getRelationshipAttributes().clear(); + } + } + + private boolean isTableWithDatabaseName(String tableName) { + boolean ret = false; + if (tableName.contains(".")) { + ret = true; + } + return ret; + } + + private String getGuid(String qualifiedName) { + return getGuid(false, qualifiedName); + } + + private String getGuid(boolean isDBType, String qualifiedName) { + String guid = null; + + if (qualifiedNameGuidMap.containsKey(qualifiedName)) { + guid = qualifiedNameGuidMap.get(qualifiedName); + } else if (isDBType) { + guid = getDBGuidFromAtlas(qualifiedName); + } + + if (StringUtils.isBlank(guid)) { + guid = generateGuid(); + } + + return guid; + } + + private String generateGuid() { + return UUID.randomUUID().toString(); + } + + public void setStreamSize(long size) { + zipOutputStream.setComment(String.format(ZIP_FILE_COMMENT_FORMAT, size, -1)); + } +} \ No newline at end of file diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java index 2c18704..cbc1aa9 100644 --- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java +++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java @@ -17,7 +17,6 @@ */ package org.apache.atlas.model.impexp; - import com.fasterxml.jackson.annotation.JsonAnySetter; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonIgnore; @@ -50,10 +49,10 @@ public class AtlasImportRequest implements Serializable { public static final String OPTION_KEY_BATCH_SIZE = "batchSize"; public static final String OPTION_KEY_FORMAT = "format"; public static final String OPTION_KEY_FORMAT_ZIP_DIRECT = "zipDirect"; - public static final String START_POSITION_KEY = "startPosition"; + public static final String START_POSITION_KEY = "startPosition"; + public static final String UPDATE_TYPE_DEFINITION_KEY = "updateTypeDefinition"; private static final String START_GUID_KEY = "startGuid"; private static final String FILE_NAME_KEY = "fileName"; - private static final String UPDATE_TYPE_DEFINITION_KEY = "updateTypeDefinition"; private static final String OPTION_KEY_STREAM_SIZE = "size"; private Map<String, String> options; diff --git a/intg/src/main/java/org/apache/atlas/model/migration/MigrationImportStatus.java b/intg/src/main/java/org/apache/atlas/model/migration/MigrationImportStatus.java index 3430fda..1db5f20 100644 --- a/intg/src/main/java/org/apache/atlas/model/migration/MigrationImportStatus.java +++ b/intg/src/main/java/org/apache/atlas/model/migration/MigrationImportStatus.java @@ -21,12 +21,7 @@ package org.apache.atlas.model.migration; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.databind.annotation.JsonSerialize; -import org.apache.atlas.model.AtlasBaseModelObject; import org.apache.atlas.model.impexp.MigrationStatus; -import org.apache.commons.lang.StringUtils; - -import java.io.Serializable; -import java.util.Date; import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE; import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY; @@ -36,27 +31,43 @@ import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ @JsonIgnoreProperties(ignoreUnknown = true) public class MigrationImportStatus extends MigrationStatus { private String name; + private String fileHash; public MigrationImportStatus() { } public MigrationImportStatus(String name) { - this.name = name; + this.name = name; + this.fileHash = name; + } + + public MigrationImportStatus(String name, String fileHash) { + this.name = name; + this.fileHash = fileHash; } public String getName() { return name; } + public String getFileHash() { + return fileHash; + } + public void setName(String name) { this.name = name; } + public void setFileHash(String fileHash) { + this.fileHash = fileHash; + } + @Override public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append(", name=").append(name); - sb.append(super.toString()); + final StringBuilder sb = new StringBuilder("MigrationImportStatus{"); + sb.append("name='").append(name).append('\''); + sb.append(", fileHash='").append(fileHash).append('\''); + sb.append('}'); return sb.toString(); } } diff --git a/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java b/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java index a22c687..5b22f9c 100644 --- a/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java +++ b/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java @@ -24,9 +24,12 @@ import org.apache.atlas.repository.graph.AtlasGraphProvider; import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; +import org.apache.commons.codec.digest.DigestUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.FileInputStream; +import java.io.IOException; import java.util.Date; import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getEncodedProperty; @@ -50,7 +53,12 @@ public class DataMigrationStatusService { public void init(String fileToImport) { - this.status = new MigrationImportStatus(fileToImport); + try { + this.status = new MigrationImportStatus(fileToImport, DigestUtils.md5Hex(new FileInputStream(fileToImport))); + } catch (IOException e) { + LOG.error("Not able to create Migration status", e); + } + if (!this.migrationStatusVertexManagement.exists(fileToImport)) { return; } @@ -59,21 +67,28 @@ public class DataMigrationStatusService { } public MigrationImportStatus getCreate(String fileName) { - return getCreate(new MigrationImportStatus(fileName)); + MigrationImportStatus create = null; + try { + create = getCreate(new MigrationImportStatus(fileName, DigestUtils.md5Hex(new FileInputStream(fileName)))); + } catch (IOException e) { + LOG.error("Exception occurred while creating migration import", e); + } + + return create; } public MigrationImportStatus getCreate(MigrationImportStatus status) { try { this.status = this.migrationStatusVertexManagement.createOrUpdate(status); } catch (Exception ex) { - LOG.error("DataMigrationStatusService: Setting status: {}: Resulted in error!", status.getName(), ex); + LOG.error("DataMigrationStatusService: Setting status: {}: Resulted in error!", status.getFileHash(), ex); } return this.status; } public MigrationImportStatus getStatus() { - if (this.status != null && this.migrationStatusVertexManagement.exists(this.status.getName())) { + if (this.status != null && this.migrationStatusVertexManagement.exists(this.status.getFileHash())) { return getCreate(this.status); } @@ -89,8 +104,8 @@ public class DataMigrationStatusService { return; } - MigrationImportStatus status = getByName(this.status.getName()); - this.migrationStatusVertexManagement.delete(status.getName()); + MigrationImportStatus status = getByName(this.status.getFileHash()); + this.migrationStatusVertexManagement.delete(status.getFileHash()); this.status = null; } @@ -118,7 +133,7 @@ public class DataMigrationStatusService { } public MigrationImportStatus createOrUpdate(MigrationImportStatus status) { - this.vertex = findByNameInternal(status.getName()); + this.vertex = findByNameInternal(status.getFileHash()); if (this.vertex == null) { this.vertex = graph.addVertex(); @@ -192,7 +207,7 @@ public class DataMigrationStatusService { private void updateVertex(AtlasVertex vertex, MigrationImportStatus status) { try { - setEncodedProperty(vertex, Constants.GUID_PROPERTY_KEY, status.getName()); + setEncodedProperty(vertex, Constants.GUID_PROPERTY_KEY, status.getFileHash()); setEncodedProperty(vertex, PROPERTY_KEY_START_TIME, (status.getStartTime() != null) @@ -213,7 +228,7 @@ public class DataMigrationStatusService { MigrationImportStatus ret = new MigrationImportStatus(); try { - ret.setName(getEncodedProperty(vertex, Constants.GUID_PROPERTY_KEY, String.class)); + ret.setFileHash(getEncodedProperty(vertex, Constants.GUID_PROPERTY_KEY, String.class)); Long dateValue = getEncodedProperty(vertex, PROPERTY_KEY_START_TIME, Long.class); if (dateValue != null) { diff --git a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java index bfb1148..2b3f179 100644 --- a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java +++ b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java @@ -26,7 +26,9 @@ import org.apache.atlas.model.impexp.AtlasImportRequest; import org.apache.atlas.model.migration.MigrationImportStatus; import org.apache.atlas.repository.graph.AtlasGraphProvider; import org.apache.atlas.repository.impexp.ImportService; +import org.apache.atlas.repository.impexp.ZipExportFileNames; import org.apache.atlas.type.AtlasType; +import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.io.filefilter.WildcardFileFilter; import org.apache.commons.lang.ArrayUtils; @@ -196,7 +198,13 @@ public class ZipFileMigrationImporter implements Runnable { } private MigrationImportStatus getCreateMigrationStatus(String fileName, int streamSize) { - MigrationImportStatus status = new MigrationImportStatus(fileName); + MigrationImportStatus status = null; + try { + status = new MigrationImportStatus(fileName, DigestUtils.md5Hex(new FileInputStream(fileName))); + } catch (IOException e) { + LOG.error("Exception occurred while creating migration import", e); + } + status.setTotalCount(streamSize); MigrationImportStatus statusRetrieved = dataMigrationStatusService.getCreate(status); diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/DataMigrationStatusServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/DataMigrationStatusServiceTest.java index f1dc990..6d368f4 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/DataMigrationStatusServiceTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/DataMigrationStatusServiceTest.java @@ -19,13 +19,14 @@ package org.apache.atlas.repository.impexp; import com.google.inject.Inject; import org.apache.atlas.TestModules; -import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.migration.MigrationImportStatus; import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.migration.DataMigrationStatusService; import org.testng.annotations.Guice; import org.testng.annotations.Test; +import java.io.FileInputStream; +import java.io.IOException; import java.util.Date; import static org.testng.Assert.assertEquals; @@ -38,12 +39,13 @@ public class DataMigrationStatusServiceTest { AtlasGraph atlasGraph; @Test - public void createUpdateDelete() { + public void createUpdateDelete() throws IOException { final String STATUS_DONE = "DONE"; DataMigrationStatusService dataMigrationStatusService = new DataMigrationStatusService(atlasGraph); - MigrationImportStatus expected = new MigrationImportStatus("/tmp/defg.zip"); + MigrationImportStatus expected = new MigrationImportStatus("DUMMY-HASH"); + expected.setTotalCount(3333); expected.setCurrentIndex(20); expected.setStartTime(new Date()); @@ -51,7 +53,7 @@ public class DataMigrationStatusServiceTest { MigrationImportStatus ret = dataMigrationStatusService.getCreate(expected); assertNotNull(ret); - assertEquals(ret.getName(), expected.getName()); + assertEquals(ret.getFileHash(), expected.getFileHash()); assertEquals(ret.getStartTime(), expected.getStartTime()); assertEquals(ret.getTotalCount(), expected.getTotalCount()); assertEquals(ret.getCurrentIndex(), expected.getCurrentIndex()); diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java index 135b94b..0580f7f 100755 --- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java +++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java @@ -831,13 +831,17 @@ public class AdminResource { } private void addToImportOperationAudits(AtlasImportResult result) throws AtlasBaseException { - List<AtlasObjectId> objectIds = result.getExportResult().getRequest().getItemsToExport(); - Map<String, Object> optionMap = new HashMap<>(); optionMap.put(OPERATION_STATUS, result.getOperationStatus().name()); String params = AtlasJson.toJson(optionMap); - auditImportExportOperations(objectIds, AuditOperation.IMPORT, params); + if(result.getExportResult().getRequest() == null) { + int resultCount = result.getProcessedEntities().size(); + auditService.add(AuditOperation.IMPORT, params, AtlasJson.toJson(result.getMetrics()), resultCount); + } else { + List<AtlasObjectId> objectIds = result.getExportResult().getRequest().getItemsToExport(); + auditImportExportOperations(objectIds, AuditOperation.IMPORT, params); + } } private void addToExportOperationAudits(boolean isSuccessful, AtlasExportResult result) throws AtlasBaseException {