This is an automated email from the ASF dual-hosted git repository. sidmishra pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 25fe3b2 ATLAS-4425: Added support for Migration import to run on a directory with multiple zip files 25fe3b2 is described below commit 25fe3b2966f848ca13e4ab006434bf0923edf21e Author: Sidharth Mishra <sidharthkmis...@gmail.com> AuthorDate: Thu Oct 7 16:56:22 2021 -0700 ATLAS-4425: Added support for Migration import to run on a directory with multiple zip files Signed-off-by: sidmishra <sidmis...@apache.org> (cherry picked from commit 11627e33d3b262d60ae0a3c6338f7963ffedaeab) --- .../atlas/repository/impexp/ZipSourceDirect.java | 2 +- .../migration/ZipFileMigrationImporter.java | 155 ++++++++++++++++++--- 2 files changed, 136 insertions(+), 21 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java index 04342fa..5cf1b74 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java @@ -257,9 +257,9 @@ public class ZipSourceDirect implements EntityImportStream { if (zipEntryNext.getName().equals(ZipExportFileNames.ATLAS_TYPESDEF_NAME.toEntryFileName())) { String json = getJsonPayloadFromZipEntryStream(this.zipInputStream); this.typesDef = AtlasType.fromJson(json, AtlasTypesDef.class); + zipEntryNext = zipInputStream.getNextEntry(); } - zipEntryNext = zipInputStream.getNextEntry(); if (zipEntryNext.getName().equals(ZIP_ENTRY_ENTITIES)) { this.entitiesArrayParser = new EntitiesArrayParser(zipInputStream); } else { diff --git a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java index d56261f..bfb1148 100644 --- a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java +++ b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java @@ -27,6 +27,9 @@ import org.apache.atlas.model.migration.MigrationImportStatus; import org.apache.atlas.repository.graph.AtlasGraphProvider; import org.apache.atlas.repository.impexp.ImportService; import org.apache.atlas.type.AtlasType; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.io.filefilter.WildcardFileFilter; +import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,7 +38,11 @@ import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.FileFilter; import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.Map; import java.util.zip.ZipFile; @@ -48,35 +55,143 @@ public class ZipFileMigrationImporter implements Runnable { private static final String DEFAULT_BATCH_SIZE = "100"; private static final String ZIP_FILE_COMMENT_ENTITIES_COUNT = "entitiesCount"; private static final String ZIP_FILE_COMMENT_TOTAL_COUNT = "total"; - + private static final String FILE_EXTENSION_ZIP = ".zip"; private final static String ENV_USER_NAME = "user.name"; + private final static String ARCHIVE_DIR = "archive"; - private final ImportService importService; - private final String fileToImport; - private DataMigrationStatusService dataMigrationStatusService; - private MigrationImportStatus migrationImportStatus; + private final ImportService importService; + private List<String> filesToImport; + private DataMigrationStatusService dataMigrationStatusService; + private MigrationImportStatus migrationImportStatus; + private File archiveDir; + /** + * Input: + * fileName : can support wildcards. If it contains wildcards then all matching files will be imported + */ public ZipFileMigrationImporter(ImportService importService, String fileName) { this.importService = importService; - this.fileToImport = fileName; this.dataMigrationStatusService = new DataMigrationStatusService(AtlasGraphProvider.getGraphInstance()); + + initialize(fileName); + } + + private void initialize(String fileName) { + this.filesToImport = getAllFilesToImport(fileName); + + if (CollectionUtils.isNotEmpty(this.filesToImport)) { + createArchiveDirectory(fileName); + } } @Override public void run() { - try { - detectFileToImport(); + for (String fileToImport : filesToImport) { + try { + detectFileToImport(fileToImport); - int streamSize = getStreamSizeFromComment(fileToImport); - migrationImportStatus = getCreateMigrationStatus(fileToImport, streamSize); - performImport(fileToImport, streamSize, Long.toString(migrationImportStatus.getCurrentIndex())); - dataMigrationStatusService.setStatus("DONE"); - } catch (IOException e) { - LOG.error("Migration Import: IO Error!", e); - dataMigrationStatusService.setStatus("FAIL"); - } catch (AtlasBaseException e) { - LOG.error("Migration Import: Error!", e); - dataMigrationStatusService.setStatus("FAIL"); + int streamSize = getStreamSizeFromComment(fileToImport); + migrationImportStatus = getCreateMigrationStatus(fileToImport, streamSize); + performImport(fileToImport, streamSize, Long.toString(migrationImportStatus.getCurrentIndex())); + dataMigrationStatusService.setStatus("DONE"); + + moveZipFileToArchiveDir(fileToImport); + } catch (IOException e) { + LOG.error("Migration Import: IO Error!", e); + dataMigrationStatusService.setStatus("FAIL"); + } catch (AtlasBaseException e) { + LOG.error("Migration Import: Error!", e); + dataMigrationStatusService.setStatus("FAIL"); + } + } + } + + /** + * Input: + * fileName : If it contains wildcards then all matching files will be discovered + */ + private List<String> getAllFilesToImport(String fileName) { + List<String> ret = new ArrayList<>(); + File fileToImport = new File(fileName); + + if (fileToImport.exists() && fileToImport.isFile()) { + //Input file present so no need to expand + LOG.info("Migration Import: zip file for import: " + fileToImport); + + ret.add(fileToImport.getAbsolutePath()); + } else { + //The fileName might have wildcard + String dirPath = new File(fileToImport.getParent()).getAbsolutePath(); + File importDataDir = new File(dirPath); + + if (importDataDir.exists() && importDataDir.isDirectory()) { + String fileNameWithWildcard = fileToImport.getName(); + FileFilter fileFilter = new WildcardFileFilter(fileNameWithWildcard); + + File[] importFiles = importDataDir.listFiles(fileFilter); + + if (ArrayUtils.isNotEmpty(importFiles)) { + Arrays.sort(importFiles); + + LOG.info("Migration Import: zip files for import: "); + + for (File importFile : importFiles) { + if (isValidImportFile(importFile)) { + LOG.info(importFile.getName() + " with absolute path - " + importFile.getAbsolutePath()); + ret.add(importFile.getAbsolutePath()); + } else { + LOG.warn("Ignoring " + importFile.getAbsolutePath() + " as it is not a file or does not end with extension " + FILE_EXTENSION_ZIP); + } + } + } else { + LOG.warn("Migration Import: No files to import"); + } + } + } + + return ret; + } + + private boolean isValidImportFile(File importFile) { + return importFile.isFile() && StringUtils.endsWithIgnoreCase(importFile.getName(), FILE_EXTENSION_ZIP); + } + + private void createArchiveDirectory(String fileName) { + File fileToImport = new File(fileName); + String parentPath = new File(fileToImport.getParent()).getAbsolutePath(); + + this.archiveDir = new File(parentPath + File.separator + ARCHIVE_DIR); + + if (this.archiveDir.exists() && !this.archiveDir.canWrite()) { + LOG.warn("Migration Import: No write permission to archive directory {}", this.archiveDir.getAbsolutePath()); + this.archiveDir = null; + } else if (!this.archiveDir.exists() && !this.archiveDir.getParentFile().canWrite()) { + LOG.warn("Migration Import: No permission to create archive directory {}", this.archiveDir.getAbsolutePath()); + this.archiveDir = null; + } else { + this.archiveDir.mkdirs(); + LOG.info("Migration Import: archive directory for zip files: {}", this.archiveDir.getAbsolutePath()); + } + } + + private void moveZipFileToArchiveDir(String srcFilePath) { + if (this.archiveDir == null) { + return; + } + + File sourceFile = new File(srcFilePath); + String newFile = archiveDir.getAbsolutePath() + File.separator + sourceFile.getName(); + + if (!sourceFile.canWrite()) { + LOG.warn("Migration Import: No permission to archive the zip file {}", sourceFile.getAbsolutePath()); + this.archiveDir = null; + } else { + if (sourceFile.renameTo(new File(newFile))) { + sourceFile.delete(); + LOG.info("Migration Import: Successfully archived the zip file: " + srcFilePath + " to " + this.archiveDir.getAbsolutePath()); + } else { + LOG.warn("Migration Import: Failed to archive the zip file: " + srcFilePath); + } } } @@ -91,7 +206,7 @@ public class ZipFileMigrationImporter implements Runnable { return statusRetrieved; } - private void detectFileToImport() throws IOException { + private void detectFileToImport(String fileToImport) throws IOException { FileWatcher fileWatcher = new FileWatcher(fileToImport); fileWatcher.start(); } @@ -126,7 +241,7 @@ public class ZipFileMigrationImporter implements Runnable { private void performImport(String fileToImport, int streamSize, String startPosition) throws AtlasBaseException { try { LOG.info("Migration Import: {}: Starting at: {}...", fileToImport, startPosition); - InputStream fs = new FileInputStream(new File(fileToImport)); + InputStream fs = new FileInputStream(fileToImport); RequestContext.get().setUser(getUserNameFromEnvironment(), null); importService.run(fs, getImportRequest(fileToImport, streamSize, startPosition),