This is an automated email from the ASF dual-hosted git repository. nixon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/atlas.git
commit b02443ec117458254f919ec60d2dee5fdf3ef058 Author: nixonrodrigues <[email protected]> AuthorDate: Wed Feb 26 14:51:18 2020 +0530 Revert "DataMigration: Automatic resume." This reverts commit 54042d35b29f91b46fd033a6378dedf1ff47c5d9. --- addons/models/0000-Area0/0010-base_model.json | 50 ---------- .../model/migration/MigrationImportStatus.java | 98 ------------------- .../repository/migration/DataMigrationService.java | 4 +- .../migration/DataMigrationStatusService.java | 104 --------------------- .../migration/ZipFileMigrationImporter.java | 33 ++----- .../repository/ogm/MigrationImportStatusDTO.java | 103 -------------------- .../store/graph/v2/BulkImporterImpl.java | 8 +- .../store/graph/v2/bulkimport/MigrationImport.java | 13 +-- .../v2/bulkimport/pc/EntityCreationManager.java | 15 +-- 9 files changed, 24 insertions(+), 404 deletions(-) diff --git a/addons/models/0000-Area0/0010-base_model.json b/addons/models/0000-Area0/0010-base_model.json index 001bb6c..6bdd2f7 100644 --- a/addons/models/0000-Area0/0010-base_model.json +++ b/addons/models/0000-Area0/0010-base_model.json @@ -256,56 +256,6 @@ ] }, { - "name": "__MigrationImportStatus", - "superTypes": [ - "__internal" - ], - "serviceType": "atlas_core", - "typeVersion": "1.0", - "attributeDefs": [ - { - "name": "name", - "typeName": "string", - "cardinality": "SINGLE", - "isIndexable": true, - "isOptional": false, - "isUnique": true - }, - { - "name": "size", - "typeName": "int", - "cardinality": "SINGLE", - "isIndexable": true, - "isOptional": true, - "isUnique": false - }, - { - "name": "position", - "typeName": "string", - "cardinality": "SINGLE", - "isIndexable": true, - "isOptional": true, - "isUnique": false - }, - { - "name": "startTime", - "typeName": "long", - "cardinality": "SINGLE", - "isIndexable": true, - "isOptional": true, - "isUnique": false - }, - { - "name": "endTime", - "typeName": "long", - "cardinality": "SINGLE", - "isIndexable": true, - "isOptional": true, - "isUnique": false - } - ] - }, - { "name": "__AtlasUserSavedSearch", "superTypes": [ "__internal" diff --git a/intg/src/main/java/org/apache/atlas/model/migration/MigrationImportStatus.java b/intg/src/main/java/org/apache/atlas/model/migration/MigrationImportStatus.java deleted file mode 100644 index e3f1326..0000000 --- a/intg/src/main/java/org/apache/atlas/model/migration/MigrationImportStatus.java +++ /dev/null @@ -1,98 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.atlas.model.migration; - -import com.fasterxml.jackson.annotation.JsonAutoDetect; -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import com.fasterxml.jackson.databind.annotation.JsonSerialize; -import org.apache.atlas.model.AtlasBaseModelObject; - -import java.io.Serializable; -import java.util.Date; - -import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE; -import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY; - -@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE) -@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) -@JsonIgnoreProperties(ignoreUnknown = true) -public class MigrationImportStatus extends AtlasBaseModelObject implements Serializable { - private String name; - private int size; - private long startTime; - private long endTime; - private String position; - - public MigrationImportStatus() { - } - - public MigrationImportStatus(String name) { - this.name = name; - } - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public int getSize() { - return size; - } - - public void setSize(int size) { - this.size = size; - } - - public long getStartTime() { - return startTime; - } - - public void setStartTime(long startTime) { - this.startTime = startTime; - } - - public long getEndTime() { - return endTime; - } - - public void setEndTime(long endTime) { - this.endTime = endTime; - } - - public void setPosition(String position) { - this.position = position; - } - - public String getPosition() { - return this.position; - } - - @Override - protected StringBuilder toString(StringBuilder sb) { - sb.append(", name=").append(name); - sb.append(", size=").append(size); - sb.append(", startTime=").append(startTime); - sb.append(", endTime=").append(endTime); - - return sb; - } -} diff --git a/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationService.java b/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationService.java index 48f2a2f..0a2257e 100644 --- a/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationService.java +++ b/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationService.java @@ -60,14 +60,14 @@ public class DataMigrationService implements Service { @Inject public DataMigrationService(GraphDBMigrator migrator, AtlasTypeDefStore typeDefStore, Configuration configuration, GraphBackedSearchIndexer indexer, AtlasTypeDefStoreInitializer storeInitializer, - AtlasTypeRegistry typeRegistry, ImportService importService, DataMigrationStatusService dataMigrationStatusService) { + AtlasTypeRegistry typeRegistry, ImportService importService) { this.configuration = configuration; String fileName = getFileName(); boolean zipFileBasedMigrationImport = StringUtils.endsWithIgnoreCase(fileName, FILE_EXTENSION_ZIP); this.thread = (zipFileBasedMigrationImport) - ? new Thread(new ZipFileMigrationImporter(importService, fileName, dataMigrationStatusService), "zipFileBasedMigrationImporter") + ? new Thread(new ZipFileMigrationImporter(importService, fileName), "zipFileBasedMigrationImporter") : new Thread(new FileImporter(migrator, typeDefStore, typeRegistry, storeInitializer, fileName, indexer)); } diff --git a/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java b/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java deleted file mode 100644 index b5285d0..0000000 --- a/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java +++ /dev/null @@ -1,104 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.atlas.repository.migration; - -import org.apache.atlas.annotation.AtlasService; -import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.migration.MigrationImportStatus; -import org.apache.atlas.repository.ogm.DataAccess; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.inject.Inject; - -@AtlasService -public class DataMigrationStatusService { - private static final Logger LOG = LoggerFactory.getLogger(DataMigrationStatusService.class); - - private final DataAccess dataAccess; - private MigrationImportStatus status; - - @Inject - public DataMigrationStatusService(DataAccess dataAccess) { - this.dataAccess = dataAccess; - } - - public MigrationImportStatus getCreate(MigrationImportStatus status) { - try { - this.status = this.dataAccess.load(status); - this.status.setSize(status.getSize()); - this.status.setStartTime(status.getStartTime()); - - this.status = dataAccess.save(this.status); - } catch (Exception ex) { - LOG.info("DataMigrationStatusService: Setting status: {}...", status.getName()); - try { - this.status = dataAccess.save(status); - } catch (AtlasBaseException e) { - LOG.info("DataMigrationStatusService: Error saving status: {}...", status.getName()); - } - } - - return this.status; - } - - public MigrationImportStatus get() { - return this.status; - } - - public MigrationImportStatus getByName(String name) throws AtlasBaseException { - MigrationImportStatus status = new MigrationImportStatus(name); - - return dataAccess.load(status); - } - - public void deleteStatus() throws AtlasBaseException { - if (this.status == null) { - return; - } - - MigrationImportStatus status = getByName(this.status.getName()); - dataAccess.delete(status.getGuid()); - } - - public void savePosition(String position) { - this.status.setPosition(position); - try { - this.dataAccess.saveNoLoad(this.status); - } catch (AtlasBaseException e) { - LOG.error("Error saving status: {}", position, e); - } - } - - public void setEndTime() { - this.status.setEndTime(System.currentTimeMillis()); - try { - this.dataAccess.saveNoLoad(this.status); - } catch (AtlasBaseException e) { - LOG.error("Error saving status: endTime", e); - } - } - - public MigrationImportStatus createGet(String fileToImport, int streamSize) { - MigrationImportStatus status = new MigrationImportStatus(fileToImport); - status.setSize(streamSize); - - return getCreate(status); - } -} diff --git a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java index 72ffab4..69d78cd 100644 --- a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java +++ b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java @@ -23,7 +23,6 @@ import org.apache.atlas.AtlasException; import org.apache.atlas.RequestContext; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.impexp.AtlasImportRequest; -import org.apache.atlas.model.migration.MigrationImportStatus; import org.apache.atlas.repository.impexp.ImportService; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; @@ -51,23 +50,20 @@ public class ZipFileMigrationImporter implements Runnable { private final ImportService importService; private final String fileToImport; - private DataMigrationStatusService dataMigrationStatusService; - public ZipFileMigrationImporter(ImportService importService, String fileName, DataMigrationStatusService dataMigrationStatusService) { + public ZipFileMigrationImporter(ImportService importService, String fileName) { this.importService = importService; this.fileToImport = fileName; - this.dataMigrationStatusService = dataMigrationStatusService; } @Override public void run() { try { - detectFileToImport(); + FileWatcher fileWatcher = new FileWatcher(fileToImport); + fileWatcher.start(); int streamSize = getStreamSizeFromComment(fileToImport); - MigrationImportStatus status = dataMigrationStatusService.createGet(fileToImport, streamSize); - performImport(new FileInputStream(new File(fileToImport)), status.getPosition(), streamSize); - dataMigrationStatusService.setEndTime(); + performImport(new FileInputStream(new File(fileToImport)), streamSize); } catch (IOException e) { LOG.error("Migration Import: IO Error!", e); } catch (AtlasBaseException e) { @@ -75,11 +71,6 @@ public class ZipFileMigrationImporter implements Runnable { } } - private void detectFileToImport() throws IOException { - FileWatcher fileWatcher = new FileWatcher(fileToImport); - fileWatcher.start(); - } - private int getStreamSizeFromComment(String fileToImport) { int ret = 1; try { @@ -105,13 +96,13 @@ public class ZipFileMigrationImporter implements Runnable { return Integer.valueOf(s); } - private void performImport(InputStream fs, String position, int streamSize) throws AtlasBaseException { + private void performImport(InputStream fs, int streamSize) throws AtlasBaseException { try { - LOG.info("Migration Import: {}: Position: {}: Starting...", fileToImport, position); + LOG.info("Migration Import: {}: Starting...", fileToImport); RequestContext.get().setUser(getUserNameFromEnvironment(), null); - importService.run(fs, getImportRequest(streamSize, position), + importService.run(fs, getImportRequest(streamSize), getUserNameFromEnvironment(), InetAddress.getLocalHost().getHostName(), InetAddress.getLocalHost().getHostAddress()); @@ -121,7 +112,6 @@ public class ZipFileMigrationImporter implements Runnable { throw new AtlasBaseException(ex); } finally { LOG.info("Migration Import: {}: Done!", fileToImport); - dataMigrationStatusService.deleteStatus(); } } @@ -129,19 +119,14 @@ public class ZipFileMigrationImporter implements Runnable { return System.getProperty(ENV_USER_NAME); } - private AtlasImportRequest getImportRequest(int streamSize, String position) throws AtlasException { + private AtlasImportRequest getImportRequest(int streamSize) throws AtlasException { AtlasImportRequest request = new AtlasImportRequest(); request.setSizeOption(streamSize); request.setOption(AtlasImportRequest.OPTION_KEY_MIGRATION, "true"); request.setOption(AtlasImportRequest.OPTION_KEY_NUM_WORKERS, getPropertyValue(APPLICATION_PROPERTY_MIGRATION_NUMER_OF_WORKERS, DEFAULT_NUMBER_OF_WORKDERS)); request.setOption(AtlasImportRequest.OPTION_KEY_BATCH_SIZE, getPropertyValue(APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE, DEFAULT_BATCH_SIZE)); - - request.setOption(AtlasImportRequest.START_POSITION_KEY, - (StringUtils.isEmpty(position) - ? Integer.toString(MIGRATION_IMPORT_START_POSITION.getInt()) - : position) - ); + request.setOption(AtlasImportRequest.START_POSITION_KEY, Integer.toString(MIGRATION_IMPORT_START_POSITION.getInt())); return request; } diff --git a/repository/src/main/java/org/apache/atlas/repository/ogm/MigrationImportStatusDTO.java b/repository/src/main/java/org/apache/atlas/repository/ogm/MigrationImportStatusDTO.java deleted file mode 100644 index be541cd..0000000 --- a/repository/src/main/java/org/apache/atlas/repository/ogm/MigrationImportStatusDTO.java +++ /dev/null @@ -1,103 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.atlas.repository.ogm; - -import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.instance.AtlasEntity; -import org.apache.atlas.model.migration.MigrationImportStatus; -import org.apache.atlas.repository.Constants; -import org.apache.atlas.type.AtlasTypeRegistry; -import org.springframework.stereotype.Component; - -import javax.inject.Inject; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -@Component -public class MigrationImportStatusDTO extends AbstractDataTransferObject<MigrationImportStatus> { - public static final String PROPERTY_NAME = "name"; - public static final String PROPERTY_SIZE = "size"; - public static final String PROPERTY_POSITION = "position"; - public static final String PROPERTY_START_TIME = "startTime"; - public static final String PROPERTY_END_TIME = "endTime"; - public static final String PROPERTY_ADDITIONAL_INFO = "additionalInfo"; - - private static final Set<String> ATTRIBUTE_NAMES = new HashSet<>(Arrays.asList(PROPERTY_NAME, - PROPERTY_SIZE, PROPERTY_POSITION, - PROPERTY_START_TIME, PROPERTY_END_TIME, - PROPERTY_ADDITIONAL_INFO)); - - @Inject - public MigrationImportStatusDTO(AtlasTypeRegistry typeRegistry) { - super(typeRegistry, MigrationImportStatus.class, Constants.INTERNAL_PROPERTY_KEY_PREFIX + MigrationImportStatus.class.getSimpleName()); - } - - public static Set<String> getAttributes() { - return ATTRIBUTE_NAMES; - } - - public static MigrationImportStatus from(String guid, Map<String,Object> attributes) { - MigrationImportStatus entry = new MigrationImportStatus(); - - entry.setGuid(guid); - entry.setName((String) attributes.get(PROPERTY_NAME)); - entry.setSize((int) attributes.get(PROPERTY_SIZE)); - entry.setPosition((String) attributes.get(PROPERTY_POSITION)); - entry.setStartTime((long) attributes.get(PROPERTY_START_TIME)); - entry.setEndTime((long) attributes.get(PROPERTY_END_TIME)); - - return entry; - } - - @Override - public MigrationImportStatus from(AtlasEntity entity) { - return from(entity.getGuid(), entity.getAttributes()); - } - - @Override - public MigrationImportStatus from(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) { - return from(entityWithExtInfo.getEntity()); - } - - @Override - public AtlasEntity toEntity(MigrationImportStatus obj) { - AtlasEntity entity = getDefaultAtlasEntity(obj); - - entity.setAttribute(PROPERTY_NAME, obj.getName()); - entity.setAttribute(PROPERTY_SIZE, obj.getSize()); - entity.setAttribute(PROPERTY_POSITION, obj.getPosition()); - entity.setAttribute(PROPERTY_START_TIME, obj.getStartTime()); - entity.setAttribute(PROPERTY_END_TIME, obj.getEndTime()); - - return entity; - } - - @Override - public AtlasEntity.AtlasEntityWithExtInfo toEntityWithExtInfo(MigrationImportStatus obj) throws AtlasBaseException { - return new AtlasEntity.AtlasEntityWithExtInfo(toEntity(obj)); - } - - @Override - public Map<String, Object> getUniqueAttributes(final MigrationImportStatus obj) { - return Collections.singletonMap(PROPERTY_NAME, obj.getName()); - } -} diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java index 72b2f4f..4526002 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java @@ -26,8 +26,8 @@ import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.repository.graph.AtlasGraphProvider; +import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasVertex; -import org.apache.atlas.repository.migration.DataMigrationStatusService; import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.repository.store.graph.BulkImporter; import org.apache.atlas.repository.store.graph.v2.bulkimport.ImportStrategy; @@ -53,13 +53,11 @@ public class BulkImporterImpl implements BulkImporter { private final AtlasEntityStore entityStore; private AtlasTypeRegistry typeRegistry; - private DataMigrationStatusService dataMigrationStatusService; @Inject - public BulkImporterImpl(AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry, DataMigrationStatusService dataMigrationStatusService) { + public BulkImporterImpl(AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) { this.entityStore = entityStore; this.typeRegistry = typeRegistry; - this.dataMigrationStatusService = dataMigrationStatusService; } @Override @@ -67,7 +65,7 @@ public class BulkImporterImpl implements BulkImporter { ImportStrategy importStrategy = (importResult.getRequest().getOptions() != null && importResult.getRequest().getOptions().containsKey(AtlasImportRequest.OPTION_KEY_MIGRATION)) - ? new MigrationImport(new AtlasGraphProvider(), this.typeRegistry, dataMigrationStatusService) + ? new MigrationImport(new AtlasGraphProvider(), this.typeRegistry) : new RegularImport(this.entityStore, this.typeRegistry); LOG.info("BulkImportImpl: {}", importStrategy.getClass().getSimpleName()); diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java index 9819dc2..8c66656 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java @@ -26,7 +26,6 @@ import org.apache.atlas.repository.converters.AtlasFormatConverters; import org.apache.atlas.repository.converters.AtlasInstanceConverter; import org.apache.atlas.repository.graph.AtlasGraphProvider; import org.apache.atlas.repository.graphdb.AtlasGraph; -import org.apache.atlas.repository.migration.DataMigrationStatusService; import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.repository.store.graph.AtlasRelationshipStore; import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate; @@ -45,16 +44,14 @@ public class MigrationImport extends ImportStrategy { private static final Logger LOG = LoggerFactory.getLogger(MigrationImport.class); private final AtlasTypeRegistry typeRegistry; - private final DataMigrationStatusService dataMigrationStatusService; private AtlasGraph atlasGraph; private EntityGraphRetriever entityGraphRetriever; private EntityGraphMapper entityGraphMapper; private AtlasEntityStore entityStore; - public MigrationImport(AtlasGraphProvider atlasGraphProvider, AtlasTypeRegistry typeRegistry, DataMigrationStatusService dataMigrationStatusService) { + public MigrationImport(AtlasGraphProvider atlasGraphProvider, AtlasTypeRegistry typeRegistry) { this.typeRegistry = typeRegistry; setupEntityStore(atlasGraphProvider, typeRegistry); - this.dataMigrationStatusService = dataMigrationStatusService; LOG.info("MigrationImport: Using bulkLoading..."); } @@ -70,11 +67,11 @@ public class MigrationImport extends ImportStrategy { int index = 0; int streamSize = entityStream.size(); EntityMutationResponse ret = new EntityMutationResponse(); - EntityCreationManager creationManager = createEntityCreationManager(atlasGraph, dataMigrationStatusService, importResult, streamSize); + EntityCreationManager creationManager = createEntityCreationManager(atlasGraph, importResult, streamSize); try { LOG.info("Migration Import: Size: {}: Starting...", streamSize); - index = creationManager.read(entityStream, importResult.getRequest().getStartPosition()); + index = creationManager.read(entityStream); creationManager.drain(); creationManager.extractResults(); } catch (Exception ex) { @@ -87,14 +84,14 @@ public class MigrationImport extends ImportStrategy { return ret; } - private EntityCreationManager createEntityCreationManager(AtlasGraph threadedAtlasGraph, DataMigrationStatusService dataMigrationStatusService, AtlasImportResult importResult, int streamSize) { + private EntityCreationManager createEntityCreationManager(AtlasGraph threadedAtlasGraph, AtlasImportResult importResult, int streamSize) { int batchSize = importResult.getRequest().getOptionKeyBatchSize(); int numWorkers = getNumWorkers(importResult.getRequest().getOptionKeyNumWorkers()); EntityConsumerBuilder consumerBuilder = new EntityConsumerBuilder(threadedAtlasGraph, entityStore, entityGraphRetriever, typeRegistry, batchSize); - return new EntityCreationManager(consumerBuilder, batchSize, numWorkers, dataMigrationStatusService, importResult, streamSize); + return new EntityCreationManager(consumerBuilder, batchSize, numWorkers, importResult, streamSize); } private static int getNumWorkers(int numWorkersFromOptions) { diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java index 89c5429..0051941 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java @@ -22,7 +22,6 @@ import org.apache.atlas.model.impexp.AtlasImportResult; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.pc.WorkItemBuilder; import org.apache.atlas.pc.WorkItemManager; -import org.apache.atlas.repository.migration.DataMigrationStatusService; import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl; import org.apache.atlas.repository.store.graph.v2.EntityImportStream; import org.apache.commons.lang.StringUtils; @@ -31,27 +30,25 @@ import org.slf4j.LoggerFactory; public class EntityCreationManager<AtlasEntityWithExtInfo> extends WorkItemManager { private static final Logger LOG = LoggerFactory.getLogger(EntityCreationManager.class); - private static final long STATUS_REPORT_TIMEOUT_DURATION = 5 * 60 * 1000; // 5 min private static final String WORKER_PREFIX = "migration-import"; private final StatusReporter<String, String> statusReporter; - private final DataMigrationStatusService dataMigrationStatusService; private final AtlasImportResult importResult; private final int streamSize; + private final long STATUS_REPORT_TIMEOUT_DURATION = 5 * 60 * 1000; // 5 min private String currentTypeName; private float currentPercent; - public EntityCreationManager(WorkItemBuilder builder, int batchSize, int numWorkers, DataMigrationStatusService dataMigrationStatusService, AtlasImportResult importResult, int streamSize) { + public EntityCreationManager(WorkItemBuilder builder, int batchSize, int numWorkers, AtlasImportResult importResult, int streamSize) { super(builder, WORKER_PREFIX, batchSize, numWorkers, true); - this.dataMigrationStatusService = dataMigrationStatusService; this.importResult = importResult; this.streamSize = streamSize; this.statusReporter = new StatusReporter<>(STATUS_REPORT_TIMEOUT_DURATION); } - public int read(EntityImportStream entityStream, String startPosition) { - int currentIndex = StringUtils.isEmpty(startPosition) ? 0 : Integer.valueOf(startPosition); + public int read(EntityImportStream entityStream) { + int currentIndex = 0; AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo; while ((entityWithExtInfo = entityStream.getNextEntityWithExtInfo()) != null) { AtlasEntity entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null; @@ -106,10 +103,8 @@ public class EntityCreationManager<AtlasEntityWithExtInfo> extends WorkItemManag return; } - String currentPosition = split[1]; - dataMigrationStatusService.savePosition(currentPosition); importResult.incrementMeticsCounter(split[0]); - this.currentPercent = updateImportMetrics(split[0], Integer.parseInt(currentPosition), getStreamSize(), getCurrentPercent()); + this.currentPercent = updateImportMetrics(split[0], Integer.parseInt(split[1]), getStreamSize(), getCurrentPercent()); } private static float updateImportMetrics(String typeNameGuid, int currentIndex, int streamSize, float currentPercent) {
