This is an automated email from the ASF dual-hosted git repository.
amestry pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new 3e035c4 ATLAS-3595, ATLAS-3603: Auto-start migration import for ZIP
file-based migration.
3e035c4 is described below
commit 3e035c45a9dd695471d0927bbcdd72108094c828
Author: Nikhil Bonte <[email protected]>
AuthorDate: Fri Feb 14 09:07:42 2020 -0800
ATLAS-3595, ATLAS-3603: Auto-start migration import for ZIP file-based
migration.
Signed-off-by: Ashutosh Mestry <[email protected]>
---
.../java/org/apache/atlas/service/Services.java | 16 ++-
.../repository/migration/DataMigrationService.java | 19 ++-
.../atlas/repository/migration/FileWatcher.java | 149 +++++++++++++++++++++
.../migration/ZipFileMigrationImporter.java | 93 +++++++++++++
4 files changed, 271 insertions(+), 6 deletions(-)
diff --git a/common/src/main/java/org/apache/atlas/service/Services.java
b/common/src/main/java/org/apache/atlas/service/Services.java
index 7b36db5..75eba03 100644
--- a/common/src/main/java/org/apache/atlas/service/Services.java
+++ b/common/src/main/java/org/apache/atlas/service/Services.java
@@ -40,10 +40,12 @@ import static
org.apache.atlas.AtlasConstants.ATLAS_SERVICES_ENABLED;
public class Services {
public static final Logger LOG = LoggerFactory.getLogger(Services.class);
private static final String DATA_MIGRATION_CLASS_NAME_DEFAULT =
"DataMigrationService";
+ private static final String FILE_EXTENSION_ZIP = ".zip";
private final List<Service> services;
private final String dataMigrationClassName;
private final boolean servicesEnabled;
+ private final String migrationDirName;
private final boolean migrationEnabled;
@Inject
@@ -51,7 +53,8 @@ public class Services {
this.services = services;
this.dataMigrationClassName =
configuration.getString("atlas.migration.class.name",
DATA_MIGRATION_CLASS_NAME_DEFAULT);
this.servicesEnabled =
configuration.getBoolean(ATLAS_SERVICES_ENABLED, true);
- this.migrationEnabled =
StringUtils.isNotEmpty(configuration.getString(ATLAS_MIGRATION_MODE_FILENAME));
+ this.migrationDirName =
configuration.getString(ATLAS_MIGRATION_MODE_FILENAME);
+ this.migrationEnabled = StringUtils.isNotEmpty(migrationDirName);
}
@PostConstruct
@@ -92,11 +95,22 @@ public class Services {
private boolean isServiceUsed(Service service) {
if (isDataMigrationService(service)) {
return migrationEnabled;
+ } else if (isZipFileMigration()) {
+ return isNeededForZipFileMigration(service);
} else {
return !migrationEnabled && servicesEnabled;
}
}
+ private boolean isZipFileMigration() {
+ return migrationEnabled &&
StringUtils.endsWithIgnoreCase(migrationDirName, FILE_EXTENSION_ZIP);
+ }
+
+ private boolean isNeededForZipFileMigration(Service svc) {
+ return
svc.getClass().getSuperclass().getSimpleName().equals("AbstractStorageBasedAuditRepository")
||
+
svc.getClass().getSuperclass().getSimpleName().equals("AbstractNotification");
+ }
+
private boolean isDataMigrationService(Service svc) {
return svc.getClass().getSimpleName().equals(dataMigrationClassName);
}
diff --git
a/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationService.java
b/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationService.java
index 1701361..0a2257e 100644
---
a/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationService.java
+++
b/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationService.java
@@ -18,20 +18,21 @@
package org.apache.atlas.repository.migration;
+import org.apache.atlas.AtlasException;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
import org.apache.atlas.repository.graphdb.GraphDBMigrator;
+import org.apache.atlas.repository.impexp.ImportService;
import org.apache.atlas.repository.impexp.ImportTypeDefProcessor;
import
org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer;
+import org.apache.atlas.service.Service;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.configuration.Configuration;
-import org.apache.atlas.AtlasException;
-import org.apache.atlas.service.Service;
import org.apache.commons.io.FileUtils;
-import org.apache.solr.common.StringUtils;
+import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@@ -48,6 +49,8 @@ import static
org.apache.atlas.AtlasConstants.ATLAS_MIGRATION_MODE_FILENAME;
public class DataMigrationService implements Service {
private static final Logger LOG =
LoggerFactory.getLogger(DataMigrationService.class);
+ private static final String FILE_EXTENSION_ZIP = ".zip";
+
private static String ATLAS_MIGRATION_DATA_NAME =
"atlas-migration-data.json";
private static String ATLAS_MIGRATION_TYPESDEF_NAME =
"atlas-migration-typesdef.json";
@@ -57,9 +60,15 @@ public class DataMigrationService implements Service {
@Inject
public DataMigrationService(GraphDBMigrator migrator, AtlasTypeDefStore
typeDefStore, Configuration configuration,
GraphBackedSearchIndexer indexer,
AtlasTypeDefStoreInitializer storeInitializer,
- AtlasTypeRegistry typeRegistry) {
+ AtlasTypeRegistry typeRegistry, ImportService
importService) {
this.configuration = configuration;
- this.thread = new Thread(new FileImporter(migrator,
typeDefStore, typeRegistry, storeInitializer, getFileName(), indexer));
+
+
+ String fileName = getFileName();
+ boolean zipFileBasedMigrationImport =
StringUtils.endsWithIgnoreCase(fileName, FILE_EXTENSION_ZIP);
+ this.thread = (zipFileBasedMigrationImport)
+ ? new Thread(new ZipFileMigrationImporter(importService,
fileName), "zipFileBasedMigrationImporter")
+ : new Thread(new FileImporter(migrator, typeDefStore,
typeRegistry, storeInitializer, fileName, indexer));
}
@Override
diff --git
a/repository/src/main/java/org/apache/atlas/repository/migration/FileWatcher.java
b/repository/src/main/java/org/apache/atlas/repository/migration/FileWatcher.java
new file mode 100644
index 0000000..d40015a
--- /dev/null
+++
b/repository/src/main/java/org/apache/atlas/repository/migration/FileWatcher.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.migration;
+
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileSystems;
+import java.nio.file.Path;
+import java.nio.file.StandardWatchEventKinds;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+
+public class FileWatcher {
+ private static final Logger LOG =
LoggerFactory.getLogger(FileWatcher.class);
+
+ private final static int MAX_TIMES_PAUSE = 10;
+ private final static int PAUSE_INTERVAL = 5000; // 5 secs
+
+ private int checkIncrement;
+ private final File fileToWatch;
+
+ public FileWatcher(String filePath) {
+ this.checkIncrement = 1;
+ this.fileToWatch = new File(filePath);
+ }
+
+ public void start() throws IOException {
+ if (existsAndReadyCheck()) {
+ return;
+ }
+
+ WatchService watcher = FileSystems.getDefault().newWatchService();
+ Path pathToWatch =
FileSystems.getDefault().getPath(fileToWatch.getParent());
+ register(watcher, pathToWatch);
+
+ try {
+ LOG.info(String.format("Migration File Watcher: Watching: %s",
fileToWatch.toString()));
+ startWatching(watcher);
+ } catch (InterruptedException ex) {
+ LOG.error("Migration File Watcher: Interrupted!");
+ } finally {
+ watcher.close();
+ }
+ }
+
+ private void startWatching(WatchService watcher) throws
InterruptedException {
+ while (true) {
+ WatchKey watchKey = watcher.take();
+ if (watchKey == null) {
+ continue;
+ }
+
+ for (WatchEvent event : watchKey.pollEvents()) {
+ if (checkIfFileAvailableAndReady(event)) {
+ return;
+ }
+ }
+
+ watchKey.reset();
+ }
+ }
+
+ private void register(WatchService watcher, Path path) throws IOException {
+ try {
+ path.register(watcher, StandardWatchEventKinds.ENTRY_CREATE,
StandardWatchEventKinds.ENTRY_MODIFY);
+ } catch (IOException e) {
+ LOG.error("Migration File Watcher: Error while registering event
{}!", path);
+ throw e;
+ }
+ }
+
+ private boolean checkIfFileAvailableAndReady(WatchEvent event) {
+ WatchEvent<Path> watchEvent = event;
+ Path path = watchEvent.context();
+
+ if (!path.toString().equals(fileToWatch.getName())) {
+ return false;
+ }
+
+ return existsAndReadyCheck();
+ }
+
+ private boolean existsAndReadyCheck() {
+ boolean ret = fileToWatch.exists() && fileToWatch.canRead();
+ if (ret) {
+ try {
+ return isReadyForUse(fileToWatch);
+ } catch (InterruptedException e) {
+ LOG.error("Migration File Watcher: Interrupted {}!",
fileToWatch);
+ return false;
+ }
+ } else {
+ LOG.info(String.format("Migration File Watcher: File does not
exist!: %s", fileToWatch.getAbsolutePath()));
+ }
+
+ return ret;
+ }
+
+ private boolean isReadyForUse(File file) throws InterruptedException {
+ Long fileSizeBefore = file.length();
+ Thread.sleep(getCheckInterval());
+ Long fileSizeAfter = file.length();
+ boolean ret = fileSizeBefore.equals(fileSizeAfter);
+
+ if (ret) {
+ LOG.info(String.format("Migration File Watcher: %s: File is ready
for use!", file.getAbsolutePath()));
+ } else {
+ incrementCheckCounter();
+ LOG.info(
+ String.format("Migration File Watcher: File is being
written: Pause: %,d secs: New size: %,d."
+ , getCheckInterval() / 1000
+ , fileSizeAfter));
+ }
+
+ return ret;
+ }
+
+ private int getCheckInterval() {
+ return (PAUSE_INTERVAL * (checkIncrement));
+ }
+
+ private int incrementCheckCounter() {
+ if (checkIncrement > MAX_TIMES_PAUSE) {
+ checkIncrement = 1;
+ }
+
+ return (PAUSE_INTERVAL * (checkIncrement++));
+ }
+}
diff --git
a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
new file mode 100644
index 0000000..ca0bc41
--- /dev/null
+++
b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.migration;
+
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.repository.impexp.ImportService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+
+public class ZipFileMigrationImporter implements Runnable {
+ private static final Logger LOG =
LoggerFactory.getLogger(ZipFileMigrationImporter.class);
+
+ private static String ENV_USER_NAME = "user.name";
+
+ private final ImportService importService;
+ private final String fileToImport;
+
+ public ZipFileMigrationImporter(ImportService importService, String
fileName) {
+ this.importService = importService;
+ this.fileToImport = fileName;
+ }
+
+ @Override
+ public void run() {
+ try {
+ FileWatcher fileWatcher = new FileWatcher(fileToImport);
+ fileWatcher.start();
+
+ performImport(new FileInputStream(new File(fileToImport)));
+ } catch (IOException e) {
+ LOG.error("Migration Import: IO Error!", e);
+ } catch (AtlasBaseException e) {
+ LOG.error("Migration Import: Error!", e);
+ }
+ }
+
+ private void performImport(InputStream fs) throws AtlasBaseException {
+ try {
+ LOG.info("Migration Import: {}: Starting...", fileToImport);
+
+ RequestContext.get().setUser(getUserNameFromEnvironment(), null);
+
+ importService.run(fs, getImportRequest(),
+ getUserNameFromEnvironment(),
+ InetAddress.getLocalHost().getHostName(),
+ InetAddress.getLocalHost().getHostAddress());
+
+ } catch (Exception ex) {
+ LOG.error("Error loading zip for migration", ex);
+ throw new AtlasBaseException(ex);
+ } finally {
+ LOG.info("Migration Import: {}: Done!", fileToImport);
+ }
+ }
+
+ private String getUserNameFromEnvironment() {
+ return System.getProperty(ENV_USER_NAME);
+ }
+
+ private AtlasImportRequest getImportRequest() throws AtlasException {
+ return new AtlasImportRequest();
+ }
+
+ private String getPropertyValue(String property, String defaultValue)
throws AtlasException {
+ return ApplicationProperties.get().getString(property, defaultValue);
+ }
+}