This is an automated email from the ASF dual-hosted git repository. madhan pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/atlas.git
commit 1ae8dc9f9adc233ca36f56304ef624610b18e050 Author: Nikhil Bonte <[email protected]> AuthorDate: Fri Feb 14 09:07:42 2020 -0800 ATLAS-3595, ATLAS-3603: Auto-start migration import for ZIP file-based migration. Signed-off-by: Ashutosh Mestry <[email protected]> (cherry picked from commit 3e035c45a9dd695471d0927bbcdd72108094c828) --- .../java/org/apache/atlas/service/Services.java | 16 ++- .../repository/migration/DataMigrationService.java | 19 ++- .../atlas/repository/migration/FileWatcher.java | 149 +++++++++++++++++++++ .../migration/ZipFileMigrationImporter.java | 93 +++++++++++++ 4 files changed, 271 insertions(+), 6 deletions(-) diff --git a/common/src/main/java/org/apache/atlas/service/Services.java b/common/src/main/java/org/apache/atlas/service/Services.java index 7b36db5..75eba03 100644 --- a/common/src/main/java/org/apache/atlas/service/Services.java +++ b/common/src/main/java/org/apache/atlas/service/Services.java @@ -40,10 +40,12 @@ import static org.apache.atlas.AtlasConstants.ATLAS_SERVICES_ENABLED; public class Services { public static final Logger LOG = LoggerFactory.getLogger(Services.class); private static final String DATA_MIGRATION_CLASS_NAME_DEFAULT = "DataMigrationService"; + private static final String FILE_EXTENSION_ZIP = ".zip"; private final List<Service> services; private final String dataMigrationClassName; private final boolean servicesEnabled; + private final String migrationDirName; private final boolean migrationEnabled; @Inject @@ -51,7 +53,8 @@ public class Services { this.services = services; this.dataMigrationClassName = configuration.getString("atlas.migration.class.name", DATA_MIGRATION_CLASS_NAME_DEFAULT); this.servicesEnabled = configuration.getBoolean(ATLAS_SERVICES_ENABLED, true); - this.migrationEnabled = StringUtils.isNotEmpty(configuration.getString(ATLAS_MIGRATION_MODE_FILENAME)); + this.migrationDirName = configuration.getString(ATLAS_MIGRATION_MODE_FILENAME); + this.migrationEnabled = StringUtils.isNotEmpty(migrationDirName); } @PostConstruct @@ -92,11 +95,22 @@ public class Services { private boolean isServiceUsed(Service service) { if (isDataMigrationService(service)) { return migrationEnabled; + } else if (isZipFileMigration()) { + return isNeededForZipFileMigration(service); } else { return !migrationEnabled && servicesEnabled; } } + private boolean isZipFileMigration() { + return migrationEnabled && StringUtils.endsWithIgnoreCase(migrationDirName, FILE_EXTENSION_ZIP); + } + + private boolean isNeededForZipFileMigration(Service svc) { + return svc.getClass().getSuperclass().getSimpleName().equals("AbstractStorageBasedAuditRepository") || + svc.getClass().getSuperclass().getSimpleName().equals("AbstractNotification"); + } + private boolean isDataMigrationService(Service svc) { return svc.getClass().getSimpleName().equals(dataMigrationClassName); } diff --git a/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationService.java b/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationService.java index 1701361..0a2257e 100644 --- a/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationService.java +++ b/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationService.java @@ -18,20 +18,21 @@ package org.apache.atlas.repository.migration; +import org.apache.atlas.AtlasException; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.impexp.AtlasImportResult; import org.apache.atlas.model.typedef.AtlasTypesDef; import org.apache.atlas.repository.graph.GraphBackedSearchIndexer; import org.apache.atlas.repository.graphdb.GraphDBMigrator; +import org.apache.atlas.repository.impexp.ImportService; import org.apache.atlas.repository.impexp.ImportTypeDefProcessor; import org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer; +import org.apache.atlas.service.Service; import org.apache.atlas.store.AtlasTypeDefStore; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.commons.configuration.Configuration; -import org.apache.atlas.AtlasException; -import org.apache.atlas.service.Service; import org.apache.commons.io.FileUtils; -import org.apache.solr.common.StringUtils; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @@ -48,6 +49,8 @@ import static org.apache.atlas.AtlasConstants.ATLAS_MIGRATION_MODE_FILENAME; public class DataMigrationService implements Service { private static final Logger LOG = LoggerFactory.getLogger(DataMigrationService.class); + private static final String FILE_EXTENSION_ZIP = ".zip"; + private static String ATLAS_MIGRATION_DATA_NAME = "atlas-migration-data.json"; private static String ATLAS_MIGRATION_TYPESDEF_NAME = "atlas-migration-typesdef.json"; @@ -57,9 +60,15 @@ public class DataMigrationService implements Service { @Inject public DataMigrationService(GraphDBMigrator migrator, AtlasTypeDefStore typeDefStore, Configuration configuration, GraphBackedSearchIndexer indexer, AtlasTypeDefStoreInitializer storeInitializer, - AtlasTypeRegistry typeRegistry) { + AtlasTypeRegistry typeRegistry, ImportService importService) { this.configuration = configuration; - this.thread = new Thread(new FileImporter(migrator, typeDefStore, typeRegistry, storeInitializer, getFileName(), indexer)); + + + String fileName = getFileName(); + boolean zipFileBasedMigrationImport = StringUtils.endsWithIgnoreCase(fileName, FILE_EXTENSION_ZIP); + this.thread = (zipFileBasedMigrationImport) + ? new Thread(new ZipFileMigrationImporter(importService, fileName), "zipFileBasedMigrationImporter") + : new Thread(new FileImporter(migrator, typeDefStore, typeRegistry, storeInitializer, fileName, indexer)); } @Override diff --git a/repository/src/main/java/org/apache/atlas/repository/migration/FileWatcher.java b/repository/src/main/java/org/apache/atlas/repository/migration/FileWatcher.java new file mode 100644 index 0000000..d40015a --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/migration/FileWatcher.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.migration; + +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.FileSystems; +import java.nio.file.Path; +import java.nio.file.StandardWatchEventKinds; +import java.nio.file.WatchEvent; +import java.nio.file.WatchKey; +import java.nio.file.WatchService; + +public class FileWatcher { + private static final Logger LOG = LoggerFactory.getLogger(FileWatcher.class); + + private final static int MAX_TIMES_PAUSE = 10; + private final static int PAUSE_INTERVAL = 5000; // 5 secs + + private int checkIncrement; + private final File fileToWatch; + + public FileWatcher(String filePath) { + this.checkIncrement = 1; + this.fileToWatch = new File(filePath); + } + + public void start() throws IOException { + if (existsAndReadyCheck()) { + return; + } + + WatchService watcher = FileSystems.getDefault().newWatchService(); + Path pathToWatch = FileSystems.getDefault().getPath(fileToWatch.getParent()); + register(watcher, pathToWatch); + + try { + LOG.info(String.format("Migration File Watcher: Watching: %s", fileToWatch.toString())); + startWatching(watcher); + } catch (InterruptedException ex) { + LOG.error("Migration File Watcher: Interrupted!"); + } finally { + watcher.close(); + } + } + + private void startWatching(WatchService watcher) throws InterruptedException { + while (true) { + WatchKey watchKey = watcher.take(); + if (watchKey == null) { + continue; + } + + for (WatchEvent event : watchKey.pollEvents()) { + if (checkIfFileAvailableAndReady(event)) { + return; + } + } + + watchKey.reset(); + } + } + + private void register(WatchService watcher, Path path) throws IOException { + try { + path.register(watcher, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_MODIFY); + } catch (IOException e) { + LOG.error("Migration File Watcher: Error while registering event {}!", path); + throw e; + } + } + + private boolean checkIfFileAvailableAndReady(WatchEvent event) { + WatchEvent<Path> watchEvent = event; + Path path = watchEvent.context(); + + if (!path.toString().equals(fileToWatch.getName())) { + return false; + } + + return existsAndReadyCheck(); + } + + private boolean existsAndReadyCheck() { + boolean ret = fileToWatch.exists() && fileToWatch.canRead(); + if (ret) { + try { + return isReadyForUse(fileToWatch); + } catch (InterruptedException e) { + LOG.error("Migration File Watcher: Interrupted {}!", fileToWatch); + return false; + } + } else { + LOG.info(String.format("Migration File Watcher: File does not exist!: %s", fileToWatch.getAbsolutePath())); + } + + return ret; + } + + private boolean isReadyForUse(File file) throws InterruptedException { + Long fileSizeBefore = file.length(); + Thread.sleep(getCheckInterval()); + Long fileSizeAfter = file.length(); + boolean ret = fileSizeBefore.equals(fileSizeAfter); + + if (ret) { + LOG.info(String.format("Migration File Watcher: %s: File is ready for use!", file.getAbsolutePath())); + } else { + incrementCheckCounter(); + LOG.info( + String.format("Migration File Watcher: File is being written: Pause: %,d secs: New size: %,d." + , getCheckInterval() / 1000 + , fileSizeAfter)); + } + + return ret; + } + + private int getCheckInterval() { + return (PAUSE_INTERVAL * (checkIncrement)); + } + + private int incrementCheckCounter() { + if (checkIncrement > MAX_TIMES_PAUSE) { + checkIncrement = 1; + } + + return (PAUSE_INTERVAL * (checkIncrement++)); + } +} diff --git a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java new file mode 100644 index 0000000..ca0bc41 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.migration; + +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasException; +import org.apache.atlas.RequestContext; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.impexp.AtlasImportRequest; +import org.apache.atlas.repository.impexp.ImportService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.InetAddress; + +public class ZipFileMigrationImporter implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(ZipFileMigrationImporter.class); + + private static String ENV_USER_NAME = "user.name"; + + private final ImportService importService; + private final String fileToImport; + + public ZipFileMigrationImporter(ImportService importService, String fileName) { + this.importService = importService; + this.fileToImport = fileName; + } + + @Override + public void run() { + try { + FileWatcher fileWatcher = new FileWatcher(fileToImport); + fileWatcher.start(); + + performImport(new FileInputStream(new File(fileToImport))); + } catch (IOException e) { + LOG.error("Migration Import: IO Error!", e); + } catch (AtlasBaseException e) { + LOG.error("Migration Import: Error!", e); + } + } + + private void performImport(InputStream fs) throws AtlasBaseException { + try { + LOG.info("Migration Import: {}: Starting...", fileToImport); + + RequestContext.get().setUser(getUserNameFromEnvironment(), null); + + importService.run(fs, getImportRequest(), + getUserNameFromEnvironment(), + InetAddress.getLocalHost().getHostName(), + InetAddress.getLocalHost().getHostAddress()); + + } catch (Exception ex) { + LOG.error("Error loading zip for migration", ex); + throw new AtlasBaseException(ex); + } finally { + LOG.info("Migration Import: {}: Done!", fileToImport); + } + } + + private String getUserNameFromEnvironment() { + return System.getProperty(ENV_USER_NAME); + } + + private AtlasImportRequest getImportRequest() throws AtlasException { + return new AtlasImportRequest(); + } + + private String getPropertyValue(String property, String defaultValue) throws AtlasException { + return ApplicationProperties.get().getString(property, defaultValue); + } +}
