This is an automated email from the ASF dual-hosted git repository.

amestry pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit 54042d35b29f91b46fd033a6378dedf1ff47c5d9
Author: Ashutosh Mestry <[email protected]>
AuthorDate: Thu Feb 20 17:04:49 2020 -0800

    DataMigration: Automatic resume.
---
 addons/models/0000-Area0/0010-base_model.json      |  50 ++++++++++
 .../model/migration/MigrationImportStatus.java     |  98 +++++++++++++++++++
 .../repository/migration/DataMigrationService.java |   4 +-
 .../migration/DataMigrationStatusService.java      | 104 +++++++++++++++++++++
 .../migration/ZipFileMigrationImporter.java        |  33 +++++--
 .../repository/ogm/MigrationImportStatusDTO.java   | 103 ++++++++++++++++++++
 .../store/graph/v2/BulkImporterImpl.java           |   8 +-
 .../store/graph/v2/bulkimport/MigrationImport.java |  13 ++-
 .../v2/bulkimport/pc/EntityCreationManager.java    |  15 ++-
 9 files changed, 404 insertions(+), 24 deletions(-)

diff --git a/addons/models/0000-Area0/0010-base_model.json 
b/addons/models/0000-Area0/0010-base_model.json
index 6bdd2f7..001bb6c 100644
--- a/addons/models/0000-Area0/0010-base_model.json
+++ b/addons/models/0000-Area0/0010-base_model.json
@@ -256,6 +256,56 @@
       ]
     },
     {
+      "name": "__MigrationImportStatus",
+      "superTypes": [
+        "__internal"
+      ],
+      "serviceType": "atlas_core",
+      "typeVersion": "1.0",
+      "attributeDefs": [
+        {
+          "name": "name",
+          "typeName": "string",
+          "cardinality": "SINGLE",
+          "isIndexable": true,
+          "isOptional": false,
+          "isUnique": true
+        },
+        {
+          "name": "size",
+          "typeName": "int",
+          "cardinality": "SINGLE",
+          "isIndexable": true,
+          "isOptional": true,
+          "isUnique": false
+        },
+        {
+          "name": "position",
+          "typeName": "string",
+          "cardinality": "SINGLE",
+          "isIndexable": true,
+          "isOptional": true,
+          "isUnique": false
+        },
+        {
+          "name": "startTime",
+          "typeName": "long",
+          "cardinality": "SINGLE",
+          "isIndexable": true,
+          "isOptional": true,
+          "isUnique": false
+        },
+        {
+          "name": "endTime",
+          "typeName": "long",
+          "cardinality": "SINGLE",
+          "isIndexable": true,
+          "isOptional": true,
+          "isUnique": false
+        }
+      ]
+    },
+    {
       "name": "__AtlasUserSavedSearch",
       "superTypes": [
         "__internal"
diff --git 
a/intg/src/main/java/org/apache/atlas/model/migration/MigrationImportStatus.java
 
b/intg/src/main/java/org/apache/atlas/model/migration/MigrationImportStatus.java
new file mode 100644
index 0000000..e3f1326
--- /dev/null
+++ 
b/intg/src/main/java/org/apache/atlas/model/migration/MigrationImportStatus.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.model.migration;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.atlas.model.AtlasBaseModelObject;
+
+import java.io.Serializable;
+import java.util.Date;
+
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
+import static 
com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = 
PUBLIC_ONLY, fieldVisibility = NONE)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class MigrationImportStatus extends AtlasBaseModelObject implements 
Serializable {
+    private String name;
+    private int size;
+    private long startTime;
+    private long endTime;
+    private String position;
+
+    public MigrationImportStatus() {
+    }
+
+    public MigrationImportStatus(String name) {
+        this.name = name;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public int getSize() {
+        return size;
+    }
+
+    public void setSize(int size) {
+        this.size = size;
+    }
+
+    public long getStartTime() {
+        return startTime;
+    }
+
+    public void setStartTime(long startTime) {
+        this.startTime = startTime;
+    }
+
+    public long getEndTime() {
+        return endTime;
+    }
+
+    public void setEndTime(long endTime) {
+        this.endTime = endTime;
+    }
+
+    public void setPosition(String position) {
+        this.position = position;
+    }
+
+    public String getPosition() {
+        return this.position;
+    }
+
+    @Override
+    protected StringBuilder toString(StringBuilder sb) {
+        sb.append(", name=").append(name);
+        sb.append(", size=").append(size);
+        sb.append(", startTime=").append(startTime);
+        sb.append(", endTime=").append(endTime);
+
+        return sb;
+    }
+}
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationService.java
 
b/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationService.java
index 0a2257e..48f2a2f 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationService.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationService.java
@@ -60,14 +60,14 @@ public class DataMigrationService implements Service {
     @Inject
     public DataMigrationService(GraphDBMigrator migrator, AtlasTypeDefStore 
typeDefStore, Configuration configuration,
                                 GraphBackedSearchIndexer indexer, 
AtlasTypeDefStoreInitializer storeInitializer,
-                                AtlasTypeRegistry typeRegistry, ImportService 
importService) {
+                                AtlasTypeRegistry typeRegistry, ImportService 
importService, DataMigrationStatusService dataMigrationStatusService) {
         this.configuration = configuration;
 
 
         String fileName = getFileName();
         boolean zipFileBasedMigrationImport = 
StringUtils.endsWithIgnoreCase(fileName, FILE_EXTENSION_ZIP);
         this.thread        = (zipFileBasedMigrationImport)
-            ?  new Thread(new ZipFileMigrationImporter(importService, 
fileName), "zipFileBasedMigrationImporter")
+            ?  new Thread(new ZipFileMigrationImporter(importService, 
fileName, dataMigrationStatusService), "zipFileBasedMigrationImporter")
             :  new Thread(new FileImporter(migrator, typeDefStore, 
typeRegistry, storeInitializer, fileName, indexer));
     }
 
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java
 
b/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java
new file mode 100644
index 0000000..b5285d0
--- /dev/null
+++ 
b/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.migration;
+
+import org.apache.atlas.annotation.AtlasService;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.migration.MigrationImportStatus;
+import org.apache.atlas.repository.ogm.DataAccess;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.inject.Inject;
+
+@AtlasService
+public class DataMigrationStatusService {
+    private static final Logger LOG = 
LoggerFactory.getLogger(DataMigrationStatusService.class);
+
+    private final DataAccess dataAccess;
+    private MigrationImportStatus status;
+
+    @Inject
+    public DataMigrationStatusService(DataAccess dataAccess) {
+        this.dataAccess = dataAccess;
+    }
+
+    public MigrationImportStatus getCreate(MigrationImportStatus status) {
+        try {
+            this.status = this.dataAccess.load(status);
+            this.status.setSize(status.getSize());
+            this.status.setStartTime(status.getStartTime());
+
+            this.status = dataAccess.save(this.status);
+        } catch (Exception ex) {
+            LOG.info("DataMigrationStatusService: Setting status: {}...", 
status.getName());
+            try {
+                this.status = dataAccess.save(status);
+            } catch (AtlasBaseException e) {
+                LOG.info("DataMigrationStatusService: Error saving status: 
{}...", status.getName());
+            }
+        }
+
+        return this.status;
+    }
+
+    public MigrationImportStatus get() {
+        return this.status;
+    }
+
+    public MigrationImportStatus getByName(String name) throws 
AtlasBaseException {
+        MigrationImportStatus status = new MigrationImportStatus(name);
+
+        return dataAccess.load(status);
+    }
+
+    public void deleteStatus() throws AtlasBaseException {
+        if (this.status == null) {
+            return;
+        }
+
+        MigrationImportStatus status = getByName(this.status.getName());
+        dataAccess.delete(status.getGuid());
+    }
+
+    public void savePosition(String position) {
+        this.status.setPosition(position);
+        try {
+            this.dataAccess.saveNoLoad(this.status);
+        } catch (AtlasBaseException e) {
+            LOG.error("Error saving status: {}", position, e);
+        }
+    }
+
+    public void setEndTime() {
+        this.status.setEndTime(System.currentTimeMillis());
+        try {
+            this.dataAccess.saveNoLoad(this.status);
+        } catch (AtlasBaseException e) {
+            LOG.error("Error saving status: endTime", e);
+        }
+    }
+
+    public MigrationImportStatus createGet(String fileToImport, int 
streamSize) {
+        MigrationImportStatus status = new MigrationImportStatus(fileToImport);
+        status.setSize(streamSize);
+
+        return getCreate(status);
+    }
+}
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
 
b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
index 69d78cd..72ffab4 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
@@ -23,6 +23,7 @@ import org.apache.atlas.AtlasException;
 import org.apache.atlas.RequestContext;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.migration.MigrationImportStatus;
 import org.apache.atlas.repository.impexp.ImportService;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
@@ -50,20 +51,23 @@ public class ZipFileMigrationImporter implements Runnable {
 
     private final ImportService importService;
     private final String fileToImport;
+    private DataMigrationStatusService dataMigrationStatusService;
 
-    public ZipFileMigrationImporter(ImportService importService, String 
fileName) {
+    public ZipFileMigrationImporter(ImportService importService, String 
fileName, DataMigrationStatusService dataMigrationStatusService) {
         this.importService = importService;
         this.fileToImport = fileName;
+        this.dataMigrationStatusService = dataMigrationStatusService;
     }
 
     @Override
     public void run() {
         try {
-            FileWatcher fileWatcher = new FileWatcher(fileToImport);
-            fileWatcher.start();
+            detectFileToImport();
 
             int streamSize = getStreamSizeFromComment(fileToImport);
-            performImport(new FileInputStream(new File(fileToImport)), 
streamSize);
+            MigrationImportStatus status = 
dataMigrationStatusService.createGet(fileToImport, streamSize);
+            performImport(new FileInputStream(new File(fileToImport)), 
status.getPosition(), streamSize);
+            dataMigrationStatusService.setEndTime();
         } catch (IOException e) {
             LOG.error("Migration Import: IO Error!", e);
         } catch (AtlasBaseException e) {
@@ -71,6 +75,11 @@ public class ZipFileMigrationImporter implements Runnable {
         }
     }
 
+    private void detectFileToImport() throws IOException {
+        FileWatcher fileWatcher = new FileWatcher(fileToImport);
+        fileWatcher.start();
+    }
+
     private int getStreamSizeFromComment(String fileToImport) {
         int ret = 1;
         try {
@@ -96,13 +105,13 @@ public class ZipFileMigrationImporter implements Runnable {
         return Integer.valueOf(s);
     }
 
-    private void performImport(InputStream fs, int streamSize) throws 
AtlasBaseException {
+    private void performImport(InputStream fs, String position, int 
streamSize) throws AtlasBaseException {
         try {
-            LOG.info("Migration Import: {}: Starting...", fileToImport);
+            LOG.info("Migration Import: {}: Position: {}: Starting...", 
fileToImport, position);
 
             RequestContext.get().setUser(getUserNameFromEnvironment(), null);
 
-            importService.run(fs, getImportRequest(streamSize),
+            importService.run(fs, getImportRequest(streamSize, position),
                     getUserNameFromEnvironment(),
                     InetAddress.getLocalHost().getHostName(),
                     InetAddress.getLocalHost().getHostAddress());
@@ -112,6 +121,7 @@ public class ZipFileMigrationImporter implements Runnable {
             throw new AtlasBaseException(ex);
         } finally {
             LOG.info("Migration Import: {}: Done!", fileToImport);
+            dataMigrationStatusService.deleteStatus();
         }
     }
 
@@ -119,14 +129,19 @@ public class ZipFileMigrationImporter implements Runnable 
{
         return System.getProperty(ENV_USER_NAME);
     }
 
-    private AtlasImportRequest getImportRequest(int streamSize) throws 
AtlasException {
+    private AtlasImportRequest getImportRequest(int streamSize, String 
position) throws AtlasException {
         AtlasImportRequest request = new AtlasImportRequest();
 
         request.setSizeOption(streamSize);
         request.setOption(AtlasImportRequest.OPTION_KEY_MIGRATION, "true");
         request.setOption(AtlasImportRequest.OPTION_KEY_NUM_WORKERS, 
getPropertyValue(APPLICATION_PROPERTY_MIGRATION_NUMER_OF_WORKERS, 
DEFAULT_NUMBER_OF_WORKDERS));
         request.setOption(AtlasImportRequest.OPTION_KEY_BATCH_SIZE, 
getPropertyValue(APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE, 
DEFAULT_BATCH_SIZE));
-        request.setOption(AtlasImportRequest.START_POSITION_KEY, 
Integer.toString(MIGRATION_IMPORT_START_POSITION.getInt()));
+
+        request.setOption(AtlasImportRequest.START_POSITION_KEY,
+                        (StringUtils.isEmpty(position)
+                        ? 
Integer.toString(MIGRATION_IMPORT_START_POSITION.getInt())
+                        : position)
+                );
 
         return request;
     }
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/ogm/MigrationImportStatusDTO.java
 
b/repository/src/main/java/org/apache/atlas/repository/ogm/MigrationImportStatusDTO.java
new file mode 100644
index 0000000..be541cd
--- /dev/null
+++ 
b/repository/src/main/java/org/apache/atlas/repository/ogm/MigrationImportStatusDTO.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.ogm;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.migration.MigrationImportStatus;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+@Component
+public class MigrationImportStatusDTO extends 
AbstractDataTransferObject<MigrationImportStatus> {
+    public static final String PROPERTY_NAME = "name";
+    public static final String PROPERTY_SIZE = "size";
+    public static final String PROPERTY_POSITION = "position";
+    public static final String PROPERTY_START_TIME = "startTime";
+    public static final String PROPERTY_END_TIME = "endTime";
+    public static final String PROPERTY_ADDITIONAL_INFO = "additionalInfo";
+
+    private static final Set<String> ATTRIBUTE_NAMES = new 
HashSet<>(Arrays.asList(PROPERTY_NAME,
+            PROPERTY_SIZE, PROPERTY_POSITION,
+            PROPERTY_START_TIME, PROPERTY_END_TIME,
+            PROPERTY_ADDITIONAL_INFO));
+
+    @Inject
+    public MigrationImportStatusDTO(AtlasTypeRegistry typeRegistry) {
+        super(typeRegistry, MigrationImportStatus.class, 
Constants.INTERNAL_PROPERTY_KEY_PREFIX + 
MigrationImportStatus.class.getSimpleName());
+    }
+
+    public static Set<String> getAttributes() {
+        return ATTRIBUTE_NAMES;
+    }
+
+    public static MigrationImportStatus from(String guid, Map<String,Object> 
attributes) {
+        MigrationImportStatus entry = new MigrationImportStatus();
+
+        entry.setGuid(guid);
+        entry.setName((String) attributes.get(PROPERTY_NAME));
+        entry.setSize((int) attributes.get(PROPERTY_SIZE));
+        entry.setPosition((String) attributes.get(PROPERTY_POSITION));
+        entry.setStartTime((long) attributes.get(PROPERTY_START_TIME));
+        entry.setEndTime((long) attributes.get(PROPERTY_END_TIME));
+
+        return entry;
+    }
+
+    @Override
+    public MigrationImportStatus from(AtlasEntity entity) {
+        return from(entity.getGuid(), entity.getAttributes());
+    }
+
+    @Override
+    public MigrationImportStatus from(AtlasEntity.AtlasEntityWithExtInfo 
entityWithExtInfo) {
+        return from(entityWithExtInfo.getEntity());
+    }
+
+    @Override
+    public AtlasEntity toEntity(MigrationImportStatus obj) {
+        AtlasEntity entity = getDefaultAtlasEntity(obj);
+
+        entity.setAttribute(PROPERTY_NAME, obj.getName());
+        entity.setAttribute(PROPERTY_SIZE, obj.getSize());
+        entity.setAttribute(PROPERTY_POSITION, obj.getPosition());
+        entity.setAttribute(PROPERTY_START_TIME, obj.getStartTime());
+        entity.setAttribute(PROPERTY_END_TIME, obj.getEndTime());
+
+        return entity;
+    }
+
+    @Override
+    public AtlasEntity.AtlasEntityWithExtInfo 
toEntityWithExtInfo(MigrationImportStatus obj) throws AtlasBaseException {
+        return new AtlasEntity.AtlasEntityWithExtInfo(toEntity(obj));
+    }
+
+    @Override
+    public Map<String, Object> getUniqueAttributes(final MigrationImportStatus 
obj) {
+        return Collections.singletonMap(PROPERTY_NAME, obj.getName());
+    }
+}
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java
index 4526002..72b2f4f 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java
@@ -26,8 +26,8 @@ import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.atlas.repository.graph.AtlasGraphProvider;
-import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.migration.DataMigrationStatusService;
 import org.apache.atlas.repository.store.graph.AtlasEntityStore;
 import org.apache.atlas.repository.store.graph.BulkImporter;
 import org.apache.atlas.repository.store.graph.v2.bulkimport.ImportStrategy;
@@ -53,11 +53,13 @@ public class BulkImporterImpl implements BulkImporter {
 
     private final AtlasEntityStore entityStore;
     private AtlasTypeRegistry typeRegistry;
+    private DataMigrationStatusService dataMigrationStatusService;
 
     @Inject
-    public BulkImporterImpl(AtlasEntityStore entityStore, AtlasTypeRegistry 
typeRegistry) {
+    public BulkImporterImpl(AtlasEntityStore entityStore, AtlasTypeRegistry 
typeRegistry, DataMigrationStatusService dataMigrationStatusService) {
         this.entityStore = entityStore;
         this.typeRegistry = typeRegistry;
+        this.dataMigrationStatusService = dataMigrationStatusService;
     }
 
     @Override
@@ -65,7 +67,7 @@ public class BulkImporterImpl implements BulkImporter {
         ImportStrategy importStrategy =
                 (importResult.getRequest().getOptions() != null &&
                         
importResult.getRequest().getOptions().containsKey(AtlasImportRequest.OPTION_KEY_MIGRATION))
-                ? new MigrationImport(new AtlasGraphProvider(), 
this.typeRegistry)
+                ? new MigrationImport(new AtlasGraphProvider(), 
this.typeRegistry, dataMigrationStatusService)
                 : new RegularImport(this.entityStore, this.typeRegistry);
 
         LOG.info("BulkImportImpl: {}", 
importStrategy.getClass().getSimpleName());
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java
index 8c66656..9819dc2 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java
@@ -26,6 +26,7 @@ import 
org.apache.atlas.repository.converters.AtlasFormatConverters;
 import org.apache.atlas.repository.converters.AtlasInstanceConverter;
 import org.apache.atlas.repository.graph.AtlasGraphProvider;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.migration.DataMigrationStatusService;
 import org.apache.atlas.repository.store.graph.AtlasEntityStore;
 import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
 import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate;
@@ -44,14 +45,16 @@ public class MigrationImport extends ImportStrategy {
     private static final Logger LOG = 
LoggerFactory.getLogger(MigrationImport.class);
 
     private final AtlasTypeRegistry typeRegistry;
+    private final DataMigrationStatusService dataMigrationStatusService;
     private AtlasGraph atlasGraph;
     private EntityGraphRetriever entityGraphRetriever;
     private EntityGraphMapper entityGraphMapper;
     private AtlasEntityStore entityStore;
 
-    public MigrationImport(AtlasGraphProvider atlasGraphProvider, 
AtlasTypeRegistry typeRegistry) {
+    public MigrationImport(AtlasGraphProvider atlasGraphProvider, 
AtlasTypeRegistry typeRegistry, DataMigrationStatusService 
dataMigrationStatusService) {
         this.typeRegistry = typeRegistry;
         setupEntityStore(atlasGraphProvider, typeRegistry);
+        this.dataMigrationStatusService = dataMigrationStatusService;
         LOG.info("MigrationImport: Using bulkLoading...");
     }
 
@@ -67,11 +70,11 @@ public class MigrationImport extends ImportStrategy {
         int index = 0;
         int streamSize = entityStream.size();
         EntityMutationResponse ret = new EntityMutationResponse();
-        EntityCreationManager creationManager = 
createEntityCreationManager(atlasGraph, importResult, streamSize);
+        EntityCreationManager creationManager = 
createEntityCreationManager(atlasGraph, dataMigrationStatusService, 
importResult, streamSize);
 
         try {
             LOG.info("Migration Import: Size: {}: Starting...", streamSize);
-            index = creationManager.read(entityStream);
+            index = creationManager.read(entityStream, 
importResult.getRequest().getStartPosition());
             creationManager.drain();
             creationManager.extractResults();
         } catch (Exception ex) {
@@ -84,14 +87,14 @@ public class MigrationImport extends ImportStrategy {
         return ret;
     }
 
-    private EntityCreationManager createEntityCreationManager(AtlasGraph 
threadedAtlasGraph, AtlasImportResult importResult, int streamSize) {
+    private EntityCreationManager createEntityCreationManager(AtlasGraph 
threadedAtlasGraph, DataMigrationStatusService dataMigrationStatusService, 
AtlasImportResult importResult, int streamSize) {
         int batchSize = importResult.getRequest().getOptionKeyBatchSize();
         int numWorkers = 
getNumWorkers(importResult.getRequest().getOptionKeyNumWorkers());
 
         EntityConsumerBuilder consumerBuilder =
                 new EntityConsumerBuilder(threadedAtlasGraph, entityStore, 
entityGraphRetriever, typeRegistry, batchSize);
 
-        return new EntityCreationManager(consumerBuilder, batchSize, 
numWorkers, importResult, streamSize);
+        return new EntityCreationManager(consumerBuilder, batchSize, 
numWorkers, dataMigrationStatusService, importResult, streamSize);
     }
 
     private static int getNumWorkers(int numWorkersFromOptions) {
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java
index 0051941..89c5429 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java
@@ -22,6 +22,7 @@ import org.apache.atlas.model.impexp.AtlasImportResult;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.pc.WorkItemBuilder;
 import org.apache.atlas.pc.WorkItemManager;
+import org.apache.atlas.repository.migration.DataMigrationStatusService;
 import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl;
 import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
 import org.apache.commons.lang.StringUtils;
@@ -30,25 +31,27 @@ import org.slf4j.LoggerFactory;
 
 public class EntityCreationManager<AtlasEntityWithExtInfo> extends 
WorkItemManager {
     private static final Logger LOG = 
LoggerFactory.getLogger(EntityCreationManager.class);
+    private static final long STATUS_REPORT_TIMEOUT_DURATION = 5 * 60 * 1000; 
// 5 min
     private static final String WORKER_PREFIX = "migration-import";
 
     private final StatusReporter<String, String> statusReporter;
+    private final DataMigrationStatusService dataMigrationStatusService;
     private final AtlasImportResult importResult;
     private final int streamSize;
-    private final long STATUS_REPORT_TIMEOUT_DURATION = 5 * 60 * 1000; // 5 min
     private String currentTypeName;
     private float currentPercent;
 
-    public EntityCreationManager(WorkItemBuilder builder, int batchSize, int 
numWorkers, AtlasImportResult importResult, int streamSize) {
+    public EntityCreationManager(WorkItemBuilder builder, int batchSize, int 
numWorkers, DataMigrationStatusService dataMigrationStatusService, 
AtlasImportResult importResult, int streamSize) {
         super(builder, WORKER_PREFIX, batchSize, numWorkers, true);
+        this.dataMigrationStatusService = dataMigrationStatusService;
         this.importResult = importResult;
         this.streamSize = streamSize;
 
         this.statusReporter = new 
StatusReporter<>(STATUS_REPORT_TIMEOUT_DURATION);
     }
 
-    public int read(EntityImportStream entityStream) {
-        int currentIndex = 0;
+    public int read(EntityImportStream entityStream, String startPosition) {
+        int currentIndex = StringUtils.isEmpty(startPosition) ? 0 : 
Integer.valueOf(startPosition);
         AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo;
         while ((entityWithExtInfo = entityStream.getNextEntityWithExtInfo()) 
!= null) {
             AtlasEntity entity = entityWithExtInfo != null ? 
entityWithExtInfo.getEntity() : null;
@@ -103,8 +106,10 @@ public class EntityCreationManager<AtlasEntityWithExtInfo> 
extends WorkItemManag
             return;
         }
 
+        String currentPosition = split[1];
+        dataMigrationStatusService.savePosition(currentPosition);
         importResult.incrementMeticsCounter(split[0]);
-        this.currentPercent = updateImportMetrics(split[0], 
Integer.parseInt(split[1]), getStreamSize(), getCurrentPercent());
+        this.currentPercent = updateImportMetrics(split[0], 
Integer.parseInt(currentPosition), getStreamSize(), getCurrentPercent());
     }
 
     private static float updateImportMetrics(String typeNameGuid, int 
currentIndex, int streamSize, float currentPercent) {

Reply via email to