This is an automated email from the ASF dual-hosted git repository.

nixon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit b02443ec117458254f919ec60d2dee5fdf3ef058
Author: nixonrodrigues <[email protected]>
AuthorDate: Wed Feb 26 14:51:18 2020 +0530

    Revert "DataMigration: Automatic resume."
    
    This reverts commit 54042d35b29f91b46fd033a6378dedf1ff47c5d9.
---
 addons/models/0000-Area0/0010-base_model.json      |  50 ----------
 .../model/migration/MigrationImportStatus.java     |  98 -------------------
 .../repository/migration/DataMigrationService.java |   4 +-
 .../migration/DataMigrationStatusService.java      | 104 ---------------------
 .../migration/ZipFileMigrationImporter.java        |  33 ++-----
 .../repository/ogm/MigrationImportStatusDTO.java   | 103 --------------------
 .../store/graph/v2/BulkImporterImpl.java           |   8 +-
 .../store/graph/v2/bulkimport/MigrationImport.java |  13 +--
 .../v2/bulkimport/pc/EntityCreationManager.java    |  15 +--
 9 files changed, 24 insertions(+), 404 deletions(-)

diff --git a/addons/models/0000-Area0/0010-base_model.json 
b/addons/models/0000-Area0/0010-base_model.json
index 001bb6c..6bdd2f7 100644
--- a/addons/models/0000-Area0/0010-base_model.json
+++ b/addons/models/0000-Area0/0010-base_model.json
@@ -256,56 +256,6 @@
       ]
     },
     {
-      "name": "__MigrationImportStatus",
-      "superTypes": [
-        "__internal"
-      ],
-      "serviceType": "atlas_core",
-      "typeVersion": "1.0",
-      "attributeDefs": [
-        {
-          "name": "name",
-          "typeName": "string",
-          "cardinality": "SINGLE",
-          "isIndexable": true,
-          "isOptional": false,
-          "isUnique": true
-        },
-        {
-          "name": "size",
-          "typeName": "int",
-          "cardinality": "SINGLE",
-          "isIndexable": true,
-          "isOptional": true,
-          "isUnique": false
-        },
-        {
-          "name": "position",
-          "typeName": "string",
-          "cardinality": "SINGLE",
-          "isIndexable": true,
-          "isOptional": true,
-          "isUnique": false
-        },
-        {
-          "name": "startTime",
-          "typeName": "long",
-          "cardinality": "SINGLE",
-          "isIndexable": true,
-          "isOptional": true,
-          "isUnique": false
-        },
-        {
-          "name": "endTime",
-          "typeName": "long",
-          "cardinality": "SINGLE",
-          "isIndexable": true,
-          "isOptional": true,
-          "isUnique": false
-        }
-      ]
-    },
-    {
       "name": "__AtlasUserSavedSearch",
       "superTypes": [
         "__internal"
diff --git 
a/intg/src/main/java/org/apache/atlas/model/migration/MigrationImportStatus.java
 
b/intg/src/main/java/org/apache/atlas/model/migration/MigrationImportStatus.java
deleted file mode 100644
index e3f1326..0000000
--- 
a/intg/src/main/java/org/apache/atlas/model/migration/MigrationImportStatus.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.model.migration;
-
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import com.fasterxml.jackson.databind.annotation.JsonSerialize;
-import org.apache.atlas.model.AtlasBaseModelObject;
-
-import java.io.Serializable;
-import java.util.Date;
-
-import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
-import static 
com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
-
-@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = 
PUBLIC_ONLY, fieldVisibility = NONE)
-@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class MigrationImportStatus extends AtlasBaseModelObject implements 
Serializable {
-    private String name;
-    private int size;
-    private long startTime;
-    private long endTime;
-    private String position;
-
-    public MigrationImportStatus() {
-    }
-
-    public MigrationImportStatus(String name) {
-        this.name = name;
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    public int getSize() {
-        return size;
-    }
-
-    public void setSize(int size) {
-        this.size = size;
-    }
-
-    public long getStartTime() {
-        return startTime;
-    }
-
-    public void setStartTime(long startTime) {
-        this.startTime = startTime;
-    }
-
-    public long getEndTime() {
-        return endTime;
-    }
-
-    public void setEndTime(long endTime) {
-        this.endTime = endTime;
-    }
-
-    public void setPosition(String position) {
-        this.position = position;
-    }
-
-    public String getPosition() {
-        return this.position;
-    }
-
-    @Override
-    protected StringBuilder toString(StringBuilder sb) {
-        sb.append(", name=").append(name);
-        sb.append(", size=").append(size);
-        sb.append(", startTime=").append(startTime);
-        sb.append(", endTime=").append(endTime);
-
-        return sb;
-    }
-}
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationService.java
 
b/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationService.java
index 48f2a2f..0a2257e 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationService.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationService.java
@@ -60,14 +60,14 @@ public class DataMigrationService implements Service {
     @Inject
     public DataMigrationService(GraphDBMigrator migrator, AtlasTypeDefStore 
typeDefStore, Configuration configuration,
                                 GraphBackedSearchIndexer indexer, 
AtlasTypeDefStoreInitializer storeInitializer,
-                                AtlasTypeRegistry typeRegistry, ImportService 
importService, DataMigrationStatusService dataMigrationStatusService) {
+                                AtlasTypeRegistry typeRegistry, ImportService 
importService) {
         this.configuration = configuration;
 
 
         String fileName = getFileName();
         boolean zipFileBasedMigrationImport = 
StringUtils.endsWithIgnoreCase(fileName, FILE_EXTENSION_ZIP);
         this.thread        = (zipFileBasedMigrationImport)
-            ?  new Thread(new ZipFileMigrationImporter(importService, 
fileName, dataMigrationStatusService), "zipFileBasedMigrationImporter")
+            ?  new Thread(new ZipFileMigrationImporter(importService, 
fileName), "zipFileBasedMigrationImporter")
             :  new Thread(new FileImporter(migrator, typeDefStore, 
typeRegistry, storeInitializer, fileName, indexer));
     }
 
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java
 
b/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java
deleted file mode 100644
index b5285d0..0000000
--- 
a/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.repository.migration;
-
-import org.apache.atlas.annotation.AtlasService;
-import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.migration.MigrationImportStatus;
-import org.apache.atlas.repository.ogm.DataAccess;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.inject.Inject;
-
-@AtlasService
-public class DataMigrationStatusService {
-    private static final Logger LOG = 
LoggerFactory.getLogger(DataMigrationStatusService.class);
-
-    private final DataAccess dataAccess;
-    private MigrationImportStatus status;
-
-    @Inject
-    public DataMigrationStatusService(DataAccess dataAccess) {
-        this.dataAccess = dataAccess;
-    }
-
-    public MigrationImportStatus getCreate(MigrationImportStatus status) {
-        try {
-            this.status = this.dataAccess.load(status);
-            this.status.setSize(status.getSize());
-            this.status.setStartTime(status.getStartTime());
-
-            this.status = dataAccess.save(this.status);
-        } catch (Exception ex) {
-            LOG.info("DataMigrationStatusService: Setting status: {}...", 
status.getName());
-            try {
-                this.status = dataAccess.save(status);
-            } catch (AtlasBaseException e) {
-                LOG.info("DataMigrationStatusService: Error saving status: 
{}...", status.getName());
-            }
-        }
-
-        return this.status;
-    }
-
-    public MigrationImportStatus get() {
-        return this.status;
-    }
-
-    public MigrationImportStatus getByName(String name) throws 
AtlasBaseException {
-        MigrationImportStatus status = new MigrationImportStatus(name);
-
-        return dataAccess.load(status);
-    }
-
-    public void deleteStatus() throws AtlasBaseException {
-        if (this.status == null) {
-            return;
-        }
-
-        MigrationImportStatus status = getByName(this.status.getName());
-        dataAccess.delete(status.getGuid());
-    }
-
-    public void savePosition(String position) {
-        this.status.setPosition(position);
-        try {
-            this.dataAccess.saveNoLoad(this.status);
-        } catch (AtlasBaseException e) {
-            LOG.error("Error saving status: {}", position, e);
-        }
-    }
-
-    public void setEndTime() {
-        this.status.setEndTime(System.currentTimeMillis());
-        try {
-            this.dataAccess.saveNoLoad(this.status);
-        } catch (AtlasBaseException e) {
-            LOG.error("Error saving status: endTime", e);
-        }
-    }
-
-    public MigrationImportStatus createGet(String fileToImport, int 
streamSize) {
-        MigrationImportStatus status = new MigrationImportStatus(fileToImport);
-        status.setSize(streamSize);
-
-        return getCreate(status);
-    }
-}
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
 
b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
index 72ffab4..69d78cd 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
@@ -23,7 +23,6 @@ import org.apache.atlas.AtlasException;
 import org.apache.atlas.RequestContext;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.impexp.AtlasImportRequest;
-import org.apache.atlas.model.migration.MigrationImportStatus;
 import org.apache.atlas.repository.impexp.ImportService;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
@@ -51,23 +50,20 @@ public class ZipFileMigrationImporter implements Runnable {
 
     private final ImportService importService;
     private final String fileToImport;
-    private DataMigrationStatusService dataMigrationStatusService;
 
-    public ZipFileMigrationImporter(ImportService importService, String 
fileName, DataMigrationStatusService dataMigrationStatusService) {
+    public ZipFileMigrationImporter(ImportService importService, String 
fileName) {
         this.importService = importService;
         this.fileToImport = fileName;
-        this.dataMigrationStatusService = dataMigrationStatusService;
     }
 
     @Override
     public void run() {
         try {
-            detectFileToImport();
+            FileWatcher fileWatcher = new FileWatcher(fileToImport);
+            fileWatcher.start();
 
             int streamSize = getStreamSizeFromComment(fileToImport);
-            MigrationImportStatus status = 
dataMigrationStatusService.createGet(fileToImport, streamSize);
-            performImport(new FileInputStream(new File(fileToImport)), 
status.getPosition(), streamSize);
-            dataMigrationStatusService.setEndTime();
+            performImport(new FileInputStream(new File(fileToImport)), 
streamSize);
         } catch (IOException e) {
             LOG.error("Migration Import: IO Error!", e);
         } catch (AtlasBaseException e) {
@@ -75,11 +71,6 @@ public class ZipFileMigrationImporter implements Runnable {
         }
     }
 
-    private void detectFileToImport() throws IOException {
-        FileWatcher fileWatcher = new FileWatcher(fileToImport);
-        fileWatcher.start();
-    }
-
     private int getStreamSizeFromComment(String fileToImport) {
         int ret = 1;
         try {
@@ -105,13 +96,13 @@ public class ZipFileMigrationImporter implements Runnable {
         return Integer.valueOf(s);
     }
 
-    private void performImport(InputStream fs, String position, int 
streamSize) throws AtlasBaseException {
+    private void performImport(InputStream fs, int streamSize) throws 
AtlasBaseException {
         try {
-            LOG.info("Migration Import: {}: Position: {}: Starting...", 
fileToImport, position);
+            LOG.info("Migration Import: {}: Starting...", fileToImport);
 
             RequestContext.get().setUser(getUserNameFromEnvironment(), null);
 
-            importService.run(fs, getImportRequest(streamSize, position),
+            importService.run(fs, getImportRequest(streamSize),
                     getUserNameFromEnvironment(),
                     InetAddress.getLocalHost().getHostName(),
                     InetAddress.getLocalHost().getHostAddress());
@@ -121,7 +112,6 @@ public class ZipFileMigrationImporter implements Runnable {
             throw new AtlasBaseException(ex);
         } finally {
             LOG.info("Migration Import: {}: Done!", fileToImport);
-            dataMigrationStatusService.deleteStatus();
         }
     }
 
@@ -129,19 +119,14 @@ public class ZipFileMigrationImporter implements Runnable 
{
         return System.getProperty(ENV_USER_NAME);
     }
 
-    private AtlasImportRequest getImportRequest(int streamSize, String 
position) throws AtlasException {
+    private AtlasImportRequest getImportRequest(int streamSize) throws 
AtlasException {
         AtlasImportRequest request = new AtlasImportRequest();
 
         request.setSizeOption(streamSize);
         request.setOption(AtlasImportRequest.OPTION_KEY_MIGRATION, "true");
         request.setOption(AtlasImportRequest.OPTION_KEY_NUM_WORKERS, 
getPropertyValue(APPLICATION_PROPERTY_MIGRATION_NUMER_OF_WORKERS, 
DEFAULT_NUMBER_OF_WORKDERS));
         request.setOption(AtlasImportRequest.OPTION_KEY_BATCH_SIZE, 
getPropertyValue(APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE, 
DEFAULT_BATCH_SIZE));
-
-        request.setOption(AtlasImportRequest.START_POSITION_KEY,
-                        (StringUtils.isEmpty(position)
-                        ? 
Integer.toString(MIGRATION_IMPORT_START_POSITION.getInt())
-                        : position)
-                );
+        request.setOption(AtlasImportRequest.START_POSITION_KEY, 
Integer.toString(MIGRATION_IMPORT_START_POSITION.getInt()));
 
         return request;
     }
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/ogm/MigrationImportStatusDTO.java
 
b/repository/src/main/java/org/apache/atlas/repository/ogm/MigrationImportStatusDTO.java
deleted file mode 100644
index be541cd..0000000
--- 
a/repository/src/main/java/org/apache/atlas/repository/ogm/MigrationImportStatusDTO.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.repository.ogm;
-
-import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.instance.AtlasEntity;
-import org.apache.atlas.model.migration.MigrationImportStatus;
-import org.apache.atlas.repository.Constants;
-import org.apache.atlas.type.AtlasTypeRegistry;
-import org.springframework.stereotype.Component;
-
-import javax.inject.Inject;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-@Component
-public class MigrationImportStatusDTO extends 
AbstractDataTransferObject<MigrationImportStatus> {
-    public static final String PROPERTY_NAME = "name";
-    public static final String PROPERTY_SIZE = "size";
-    public static final String PROPERTY_POSITION = "position";
-    public static final String PROPERTY_START_TIME = "startTime";
-    public static final String PROPERTY_END_TIME = "endTime";
-    public static final String PROPERTY_ADDITIONAL_INFO = "additionalInfo";
-
-    private static final Set<String> ATTRIBUTE_NAMES = new 
HashSet<>(Arrays.asList(PROPERTY_NAME,
-            PROPERTY_SIZE, PROPERTY_POSITION,
-            PROPERTY_START_TIME, PROPERTY_END_TIME,
-            PROPERTY_ADDITIONAL_INFO));
-
-    @Inject
-    public MigrationImportStatusDTO(AtlasTypeRegistry typeRegistry) {
-        super(typeRegistry, MigrationImportStatus.class, 
Constants.INTERNAL_PROPERTY_KEY_PREFIX + 
MigrationImportStatus.class.getSimpleName());
-    }
-
-    public static Set<String> getAttributes() {
-        return ATTRIBUTE_NAMES;
-    }
-
-    public static MigrationImportStatus from(String guid, Map<String,Object> 
attributes) {
-        MigrationImportStatus entry = new MigrationImportStatus();
-
-        entry.setGuid(guid);
-        entry.setName((String) attributes.get(PROPERTY_NAME));
-        entry.setSize((int) attributes.get(PROPERTY_SIZE));
-        entry.setPosition((String) attributes.get(PROPERTY_POSITION));
-        entry.setStartTime((long) attributes.get(PROPERTY_START_TIME));
-        entry.setEndTime((long) attributes.get(PROPERTY_END_TIME));
-
-        return entry;
-    }
-
-    @Override
-    public MigrationImportStatus from(AtlasEntity entity) {
-        return from(entity.getGuid(), entity.getAttributes());
-    }
-
-    @Override
-    public MigrationImportStatus from(AtlasEntity.AtlasEntityWithExtInfo 
entityWithExtInfo) {
-        return from(entityWithExtInfo.getEntity());
-    }
-
-    @Override
-    public AtlasEntity toEntity(MigrationImportStatus obj) {
-        AtlasEntity entity = getDefaultAtlasEntity(obj);
-
-        entity.setAttribute(PROPERTY_NAME, obj.getName());
-        entity.setAttribute(PROPERTY_SIZE, obj.getSize());
-        entity.setAttribute(PROPERTY_POSITION, obj.getPosition());
-        entity.setAttribute(PROPERTY_START_TIME, obj.getStartTime());
-        entity.setAttribute(PROPERTY_END_TIME, obj.getEndTime());
-
-        return entity;
-    }
-
-    @Override
-    public AtlasEntity.AtlasEntityWithExtInfo 
toEntityWithExtInfo(MigrationImportStatus obj) throws AtlasBaseException {
-        return new AtlasEntity.AtlasEntityWithExtInfo(toEntity(obj));
-    }
-
-    @Override
-    public Map<String, Object> getUniqueAttributes(final MigrationImportStatus 
obj) {
-        return Collections.singletonMap(PROPERTY_NAME, obj.getName());
-    }
-}
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java
index 72b2f4f..4526002 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java
@@ -26,8 +26,8 @@ import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.atlas.repository.graph.AtlasGraphProvider;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
-import org.apache.atlas.repository.migration.DataMigrationStatusService;
 import org.apache.atlas.repository.store.graph.AtlasEntityStore;
 import org.apache.atlas.repository.store.graph.BulkImporter;
 import org.apache.atlas.repository.store.graph.v2.bulkimport.ImportStrategy;
@@ -53,13 +53,11 @@ public class BulkImporterImpl implements BulkImporter {
 
     private final AtlasEntityStore entityStore;
     private AtlasTypeRegistry typeRegistry;
-    private DataMigrationStatusService dataMigrationStatusService;
 
     @Inject
-    public BulkImporterImpl(AtlasEntityStore entityStore, AtlasTypeRegistry 
typeRegistry, DataMigrationStatusService dataMigrationStatusService) {
+    public BulkImporterImpl(AtlasEntityStore entityStore, AtlasTypeRegistry 
typeRegistry) {
         this.entityStore = entityStore;
         this.typeRegistry = typeRegistry;
-        this.dataMigrationStatusService = dataMigrationStatusService;
     }
 
     @Override
@@ -67,7 +65,7 @@ public class BulkImporterImpl implements BulkImporter {
         ImportStrategy importStrategy =
                 (importResult.getRequest().getOptions() != null &&
                         
importResult.getRequest().getOptions().containsKey(AtlasImportRequest.OPTION_KEY_MIGRATION))
-                ? new MigrationImport(new AtlasGraphProvider(), 
this.typeRegistry, dataMigrationStatusService)
+                ? new MigrationImport(new AtlasGraphProvider(), 
this.typeRegistry)
                 : new RegularImport(this.entityStore, this.typeRegistry);
 
         LOG.info("BulkImportImpl: {}", 
importStrategy.getClass().getSimpleName());
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java
index 9819dc2..8c66656 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java
@@ -26,7 +26,6 @@ import 
org.apache.atlas.repository.converters.AtlasFormatConverters;
 import org.apache.atlas.repository.converters.AtlasInstanceConverter;
 import org.apache.atlas.repository.graph.AtlasGraphProvider;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
-import org.apache.atlas.repository.migration.DataMigrationStatusService;
 import org.apache.atlas.repository.store.graph.AtlasEntityStore;
 import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
 import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate;
@@ -45,16 +44,14 @@ public class MigrationImport extends ImportStrategy {
     private static final Logger LOG = 
LoggerFactory.getLogger(MigrationImport.class);
 
     private final AtlasTypeRegistry typeRegistry;
-    private final DataMigrationStatusService dataMigrationStatusService;
     private AtlasGraph atlasGraph;
     private EntityGraphRetriever entityGraphRetriever;
     private EntityGraphMapper entityGraphMapper;
     private AtlasEntityStore entityStore;
 
-    public MigrationImport(AtlasGraphProvider atlasGraphProvider, 
AtlasTypeRegistry typeRegistry, DataMigrationStatusService 
dataMigrationStatusService) {
+    public MigrationImport(AtlasGraphProvider atlasGraphProvider, 
AtlasTypeRegistry typeRegistry) {
         this.typeRegistry = typeRegistry;
         setupEntityStore(atlasGraphProvider, typeRegistry);
-        this.dataMigrationStatusService = dataMigrationStatusService;
         LOG.info("MigrationImport: Using bulkLoading...");
     }
 
@@ -70,11 +67,11 @@ public class MigrationImport extends ImportStrategy {
         int index = 0;
         int streamSize = entityStream.size();
         EntityMutationResponse ret = new EntityMutationResponse();
-        EntityCreationManager creationManager = 
createEntityCreationManager(atlasGraph, dataMigrationStatusService, 
importResult, streamSize);
+        EntityCreationManager creationManager = 
createEntityCreationManager(atlasGraph, importResult, streamSize);
 
         try {
             LOG.info("Migration Import: Size: {}: Starting...", streamSize);
-            index = creationManager.read(entityStream, 
importResult.getRequest().getStartPosition());
+            index = creationManager.read(entityStream);
             creationManager.drain();
             creationManager.extractResults();
         } catch (Exception ex) {
@@ -87,14 +84,14 @@ public class MigrationImport extends ImportStrategy {
         return ret;
     }
 
-    private EntityCreationManager createEntityCreationManager(AtlasGraph 
threadedAtlasGraph, DataMigrationStatusService dataMigrationStatusService, 
AtlasImportResult importResult, int streamSize) {
+    private EntityCreationManager createEntityCreationManager(AtlasGraph 
threadedAtlasGraph, AtlasImportResult importResult, int streamSize) {
         int batchSize = importResult.getRequest().getOptionKeyBatchSize();
         int numWorkers = 
getNumWorkers(importResult.getRequest().getOptionKeyNumWorkers());
 
         EntityConsumerBuilder consumerBuilder =
                 new EntityConsumerBuilder(threadedAtlasGraph, entityStore, 
entityGraphRetriever, typeRegistry, batchSize);
 
-        return new EntityCreationManager(consumerBuilder, batchSize, 
numWorkers, dataMigrationStatusService, importResult, streamSize);
+        return new EntityCreationManager(consumerBuilder, batchSize, 
numWorkers, importResult, streamSize);
     }
 
     private static int getNumWorkers(int numWorkersFromOptions) {
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java
index 89c5429..0051941 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java
@@ -22,7 +22,6 @@ import org.apache.atlas.model.impexp.AtlasImportResult;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.pc.WorkItemBuilder;
 import org.apache.atlas.pc.WorkItemManager;
-import org.apache.atlas.repository.migration.DataMigrationStatusService;
 import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl;
 import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
 import org.apache.commons.lang.StringUtils;
@@ -31,27 +30,25 @@ import org.slf4j.LoggerFactory;
 
 public class EntityCreationManager<AtlasEntityWithExtInfo> extends 
WorkItemManager {
     private static final Logger LOG = 
LoggerFactory.getLogger(EntityCreationManager.class);
-    private static final long STATUS_REPORT_TIMEOUT_DURATION = 5 * 60 * 1000; 
// 5 min
     private static final String WORKER_PREFIX = "migration-import";
 
     private final StatusReporter<String, String> statusReporter;
-    private final DataMigrationStatusService dataMigrationStatusService;
     private final AtlasImportResult importResult;
     private final int streamSize;
+    private final long STATUS_REPORT_TIMEOUT_DURATION = 5 * 60 * 1000; // 5 min
     private String currentTypeName;
     private float currentPercent;
 
-    public EntityCreationManager(WorkItemBuilder builder, int batchSize, int 
numWorkers, DataMigrationStatusService dataMigrationStatusService, 
AtlasImportResult importResult, int streamSize) {
+    public EntityCreationManager(WorkItemBuilder builder, int batchSize, int 
numWorkers, AtlasImportResult importResult, int streamSize) {
         super(builder, WORKER_PREFIX, batchSize, numWorkers, true);
-        this.dataMigrationStatusService = dataMigrationStatusService;
         this.importResult = importResult;
         this.streamSize = streamSize;
 
         this.statusReporter = new 
StatusReporter<>(STATUS_REPORT_TIMEOUT_DURATION);
     }
 
-    public int read(EntityImportStream entityStream, String startPosition) {
-        int currentIndex = StringUtils.isEmpty(startPosition) ? 0 : 
Integer.valueOf(startPosition);
+    public int read(EntityImportStream entityStream) {
+        int currentIndex = 0;
         AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo;
         while ((entityWithExtInfo = entityStream.getNextEntityWithExtInfo()) 
!= null) {
             AtlasEntity entity = entityWithExtInfo != null ? 
entityWithExtInfo.getEntity() : null;
@@ -106,10 +103,8 @@ public class EntityCreationManager<AtlasEntityWithExtInfo> 
extends WorkItemManag
             return;
         }
 
-        String currentPosition = split[1];
-        dataMigrationStatusService.savePosition(currentPosition);
         importResult.incrementMeticsCounter(split[0]);
-        this.currentPercent = updateImportMetrics(split[0], 
Integer.parseInt(currentPosition), getStreamSize(), getCurrentPercent());
+        this.currentPercent = updateImportMetrics(split[0], 
Integer.parseInt(split[1]), getStreamSize(), getCurrentPercent());
     }
 
     private static float updateImportMetrics(String typeNameGuid, int 
currentIndex, int streamSize, float currentPercent) {

Reply via email to