This is an automated email from the ASF dual-hosted git repository.

amestry pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e035c4  ATLAS-3595, ATLAS-3603: Auto-start migration import for ZIP 
file-based migration.
3e035c4 is described below

commit 3e035c45a9dd695471d0927bbcdd72108094c828
Author: Nikhil Bonte <[email protected]>
AuthorDate: Fri Feb 14 09:07:42 2020 -0800

    ATLAS-3595, ATLAS-3603: Auto-start migration import for ZIP file-based 
migration.
    
    Signed-off-by: Ashutosh Mestry <[email protected]>
---
 .../java/org/apache/atlas/service/Services.java    |  16 ++-
 .../repository/migration/DataMigrationService.java |  19 ++-
 .../atlas/repository/migration/FileWatcher.java    | 149 +++++++++++++++++++++
 .../migration/ZipFileMigrationImporter.java        |  93 +++++++++++++
 4 files changed, 271 insertions(+), 6 deletions(-)

diff --git a/common/src/main/java/org/apache/atlas/service/Services.java 
b/common/src/main/java/org/apache/atlas/service/Services.java
index 7b36db5..75eba03 100644
--- a/common/src/main/java/org/apache/atlas/service/Services.java
+++ b/common/src/main/java/org/apache/atlas/service/Services.java
@@ -40,10 +40,12 @@ import static 
org.apache.atlas.AtlasConstants.ATLAS_SERVICES_ENABLED;
 public class Services {
     public static final Logger LOG = LoggerFactory.getLogger(Services.class);
     private static final String DATA_MIGRATION_CLASS_NAME_DEFAULT = 
"DataMigrationService";
+    private static final String FILE_EXTENSION_ZIP = ".zip";
 
     private final List<Service> services;
     private final String        dataMigrationClassName;
     private final boolean       servicesEnabled;
+    private final String        migrationDirName;
     private final boolean       migrationEnabled;
 
     @Inject
@@ -51,7 +53,8 @@ public class Services {
         this.services               = services;
         this.dataMigrationClassName = 
configuration.getString("atlas.migration.class.name", 
DATA_MIGRATION_CLASS_NAME_DEFAULT);
         this.servicesEnabled        = 
configuration.getBoolean(ATLAS_SERVICES_ENABLED, true);
-        this.migrationEnabled       = 
StringUtils.isNotEmpty(configuration.getString(ATLAS_MIGRATION_MODE_FILENAME));
+        this.migrationDirName       = 
configuration.getString(ATLAS_MIGRATION_MODE_FILENAME);
+        this.migrationEnabled       = StringUtils.isNotEmpty(migrationDirName);
     }
 
     @PostConstruct
@@ -92,11 +95,22 @@ public class Services {
     private boolean isServiceUsed(Service service) {
         if (isDataMigrationService(service)) {
             return migrationEnabled;
+        } else if (isZipFileMigration()) {
+            return isNeededForZipFileMigration(service);
         } else {
             return !migrationEnabled && servicesEnabled;
         }
     }
 
+    private boolean isZipFileMigration() {
+        return migrationEnabled && 
StringUtils.endsWithIgnoreCase(migrationDirName, FILE_EXTENSION_ZIP);
+    }
+
+    private boolean isNeededForZipFileMigration(Service svc) {
+        return 
svc.getClass().getSuperclass().getSimpleName().equals("AbstractStorageBasedAuditRepository")
 ||
+                
svc.getClass().getSuperclass().getSimpleName().equals("AbstractNotification");
+    }
+
     private boolean isDataMigrationService(Service svc) {
         return svc.getClass().getSimpleName().equals(dataMigrationClassName);
     }
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationService.java
 
b/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationService.java
index 1701361..0a2257e 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationService.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationService.java
@@ -18,20 +18,21 @@
 
 package org.apache.atlas.repository.migration;
 
+import org.apache.atlas.AtlasException;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.impexp.AtlasImportResult;
 import org.apache.atlas.model.typedef.AtlasTypesDef;
 import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
 import org.apache.atlas.repository.graphdb.GraphDBMigrator;
+import org.apache.atlas.repository.impexp.ImportService;
 import org.apache.atlas.repository.impexp.ImportTypeDefProcessor;
 import 
org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer;
+import org.apache.atlas.service.Service;
 import org.apache.atlas.store.AtlasTypeDefStore;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.commons.configuration.Configuration;
-import org.apache.atlas.AtlasException;
-import org.apache.atlas.service.Service;
 import org.apache.commons.io.FileUtils;
-import org.apache.solr.common.StringUtils;
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
@@ -48,6 +49,8 @@ import static 
org.apache.atlas.AtlasConstants.ATLAS_MIGRATION_MODE_FILENAME;
 public class DataMigrationService implements Service {
     private static final Logger LOG = 
LoggerFactory.getLogger(DataMigrationService.class);
 
+    private static final String FILE_EXTENSION_ZIP = ".zip";
+
     private static String ATLAS_MIGRATION_DATA_NAME     = 
"atlas-migration-data.json";
     private static String ATLAS_MIGRATION_TYPESDEF_NAME = 
"atlas-migration-typesdef.json";
 
@@ -57,9 +60,15 @@ public class DataMigrationService implements Service {
     @Inject
     public DataMigrationService(GraphDBMigrator migrator, AtlasTypeDefStore 
typeDefStore, Configuration configuration,
                                 GraphBackedSearchIndexer indexer, 
AtlasTypeDefStoreInitializer storeInitializer,
-                                AtlasTypeRegistry typeRegistry) {
+                                AtlasTypeRegistry typeRegistry, ImportService 
importService) {
         this.configuration = configuration;
-        this.thread        = new Thread(new FileImporter(migrator, 
typeDefStore, typeRegistry, storeInitializer, getFileName(), indexer));
+
+
+        String fileName = getFileName();
+        boolean zipFileBasedMigrationImport = 
StringUtils.endsWithIgnoreCase(fileName, FILE_EXTENSION_ZIP);
+        this.thread        = (zipFileBasedMigrationImport)
+            ?  new Thread(new ZipFileMigrationImporter(importService, 
fileName), "zipFileBasedMigrationImporter")
+            :  new Thread(new FileImporter(migrator, typeDefStore, 
typeRegistry, storeInitializer, fileName, indexer));
     }
 
     @Override
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/migration/FileWatcher.java
 
b/repository/src/main/java/org/apache/atlas/repository/migration/FileWatcher.java
new file mode 100644
index 0000000..d40015a
--- /dev/null
+++ 
b/repository/src/main/java/org/apache/atlas/repository/migration/FileWatcher.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.migration;
+
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileSystems;
+import java.nio.file.Path;
+import java.nio.file.StandardWatchEventKinds;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+
+public class FileWatcher {
+    private static final Logger LOG = 
LoggerFactory.getLogger(FileWatcher.class);
+
+    private final static int MAX_TIMES_PAUSE = 10;
+    private final static int PAUSE_INTERVAL = 5000; // 5 secs
+
+    private int checkIncrement;
+    private final File fileToWatch;
+
+    public FileWatcher(String filePath) {
+        this.checkIncrement = 1;
+        this.fileToWatch = new File(filePath);
+    }
+
+    public void start() throws IOException {
+        if (existsAndReadyCheck()) {
+            return;
+        }
+
+        WatchService watcher = FileSystems.getDefault().newWatchService();
+        Path pathToWatch = 
FileSystems.getDefault().getPath(fileToWatch.getParent());
+        register(watcher, pathToWatch);
+
+        try {
+            LOG.info(String.format("Migration File Watcher: Watching: %s", 
fileToWatch.toString()));
+            startWatching(watcher);
+        } catch (InterruptedException ex) {
+            LOG.error("Migration File Watcher: Interrupted!");
+        } finally {
+            watcher.close();
+        }
+    }
+
+    private void startWatching(WatchService watcher) throws 
InterruptedException {
+        while (true) {
+            WatchKey watchKey = watcher.take();
+            if (watchKey == null) {
+                continue;
+            }
+
+            for (WatchEvent event : watchKey.pollEvents()) {
+                if (checkIfFileAvailableAndReady(event)) {
+                    return;
+                }
+            }
+
+            watchKey.reset();
+        }
+    }
+
+    private void register(WatchService watcher, Path path) throws IOException {
+        try {
+            path.register(watcher, StandardWatchEventKinds.ENTRY_CREATE, 
StandardWatchEventKinds.ENTRY_MODIFY);
+        } catch (IOException e) {
+            LOG.error("Migration File Watcher: Error while registering event 
{}!", path);
+            throw e;
+        }
+    }
+
+    private boolean checkIfFileAvailableAndReady(WatchEvent event) {
+        WatchEvent<Path> watchEvent = event;
+        Path path = watchEvent.context();
+
+        if (!path.toString().equals(fileToWatch.getName())) {
+            return false;
+        }
+
+        return existsAndReadyCheck();
+    }
+
+    private boolean existsAndReadyCheck() {
+        boolean ret = fileToWatch.exists() && fileToWatch.canRead();
+        if (ret) {
+            try {
+                return isReadyForUse(fileToWatch);
+            } catch (InterruptedException e) {
+                LOG.error("Migration File Watcher: Interrupted {}!", 
fileToWatch);
+                return false;
+            }
+        } else {
+            LOG.info(String.format("Migration File Watcher: File does not 
exist!: %s", fileToWatch.getAbsolutePath()));
+        }
+
+        return ret;
+    }
+
+    private boolean isReadyForUse(File file) throws InterruptedException {
+        Long fileSizeBefore = file.length();
+        Thread.sleep(getCheckInterval());
+        Long fileSizeAfter = file.length();
+        boolean ret = fileSizeBefore.equals(fileSizeAfter);
+
+        if (ret) {
+            LOG.info(String.format("Migration File Watcher: %s: File is ready 
for use!", file.getAbsolutePath()));
+        } else {
+            incrementCheckCounter();
+            LOG.info(
+                    String.format("Migration File Watcher: File is being 
written: Pause: %,d secs: New size: %,d."
+                            , getCheckInterval() / 1000
+                            , fileSizeAfter));
+        }
+
+        return ret;
+    }
+
+    private int getCheckInterval() {
+        return (PAUSE_INTERVAL * (checkIncrement));
+    }
+
+    private int incrementCheckCounter() {
+        if (checkIncrement > MAX_TIMES_PAUSE) {
+            checkIncrement = 1;
+        }
+
+        return (PAUSE_INTERVAL * (checkIncrement++));
+    }
+}
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
 
b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
new file mode 100644
index 0000000..ca0bc41
--- /dev/null
+++ 
b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.migration;
+
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.repository.impexp.ImportService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+
+public class ZipFileMigrationImporter implements Runnable {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ZipFileMigrationImporter.class);
+
+    private static String ENV_USER_NAME = "user.name";
+
+    private final ImportService importService;
+    private final String fileToImport;
+
+    public ZipFileMigrationImporter(ImportService importService, String 
fileName) {
+        this.importService = importService;
+        this.fileToImport = fileName;
+    }
+
+    @Override
+    public void run() {
+        try {
+            FileWatcher fileWatcher = new FileWatcher(fileToImport);
+            fileWatcher.start();
+
+            performImport(new FileInputStream(new File(fileToImport)));
+        } catch (IOException e) {
+            LOG.error("Migration Import: IO Error!", e);
+        } catch (AtlasBaseException e) {
+            LOG.error("Migration Import: Error!", e);
+        }
+    }
+
+    private void performImport(InputStream fs) throws AtlasBaseException {
+        try {
+            LOG.info("Migration Import: {}: Starting...", fileToImport);
+
+            RequestContext.get().setUser(getUserNameFromEnvironment(), null);
+
+            importService.run(fs, getImportRequest(),
+                    getUserNameFromEnvironment(),
+                    InetAddress.getLocalHost().getHostName(),
+                    InetAddress.getLocalHost().getHostAddress());
+
+        } catch (Exception ex) {
+            LOG.error("Error loading zip for migration", ex);
+            throw new AtlasBaseException(ex);
+        } finally {
+            LOG.info("Migration Import: {}: Done!", fileToImport);
+        }
+    }
+
+    private String getUserNameFromEnvironment() {
+        return System.getProperty(ENV_USER_NAME);
+    }
+
+    private AtlasImportRequest getImportRequest() throws AtlasException {
+        return new AtlasImportRequest();
+    }
+
+    private String getPropertyValue(String property, String defaultValue) 
throws AtlasException {
+        return ApplicationProperties.get().getString(property, defaultValue);
+    }
+}

Reply via email to