This is an automated email from the ASF dual-hosted git repository. amestry pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/atlas.git
commit 54042d35b29f91b46fd033a6378dedf1ff47c5d9 Author: Ashutosh Mestry <[email protected]> AuthorDate: Thu Feb 20 17:04:49 2020 -0800 DataMigration: Automatic resume. --- addons/models/0000-Area0/0010-base_model.json | 50 ++++++++++ .../model/migration/MigrationImportStatus.java | 98 +++++++++++++++++++ .../repository/migration/DataMigrationService.java | 4 +- .../migration/DataMigrationStatusService.java | 104 +++++++++++++++++++++ .../migration/ZipFileMigrationImporter.java | 33 +++++-- .../repository/ogm/MigrationImportStatusDTO.java | 103 ++++++++++++++++++++ .../store/graph/v2/BulkImporterImpl.java | 8 +- .../store/graph/v2/bulkimport/MigrationImport.java | 13 ++- .../v2/bulkimport/pc/EntityCreationManager.java | 15 ++- 9 files changed, 404 insertions(+), 24 deletions(-) diff --git a/addons/models/0000-Area0/0010-base_model.json b/addons/models/0000-Area0/0010-base_model.json index 6bdd2f7..001bb6c 100644 --- a/addons/models/0000-Area0/0010-base_model.json +++ b/addons/models/0000-Area0/0010-base_model.json @@ -256,6 +256,56 @@ ] }, { + "name": "__MigrationImportStatus", + "superTypes": [ + "__internal" + ], + "serviceType": "atlas_core", + "typeVersion": "1.0", + "attributeDefs": [ + { + "name": "name", + "typeName": "string", + "cardinality": "SINGLE", + "isIndexable": true, + "isOptional": false, + "isUnique": true + }, + { + "name": "size", + "typeName": "int", + "cardinality": "SINGLE", + "isIndexable": true, + "isOptional": true, + "isUnique": false + }, + { + "name": "position", + "typeName": "string", + "cardinality": "SINGLE", + "isIndexable": true, + "isOptional": true, + "isUnique": false + }, + { + "name": "startTime", + "typeName": "long", + "cardinality": "SINGLE", + "isIndexable": true, + "isOptional": true, + "isUnique": false + }, + { + "name": "endTime", + "typeName": "long", + "cardinality": "SINGLE", + "isIndexable": true, + "isOptional": true, + "isUnique": false + } + ] + }, + { "name": "__AtlasUserSavedSearch", "superTypes": [ "__internal" diff --git a/intg/src/main/java/org/apache/atlas/model/migration/MigrationImportStatus.java b/intg/src/main/java/org/apache/atlas/model/migration/MigrationImportStatus.java new file mode 100644 index 0000000..e3f1326 --- /dev/null +++ b/intg/src/main/java/org/apache/atlas/model/migration/MigrationImportStatus.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.model.migration; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import org.apache.atlas.model.AtlasBaseModelObject; + +import java.io.Serializable; +import java.util.Date; + +import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE; +import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY; + +@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE) +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown = true) +public class MigrationImportStatus extends AtlasBaseModelObject implements Serializable { + private String name; + private int size; + private long startTime; + private long endTime; + private String position; + + public MigrationImportStatus() { + } + + public MigrationImportStatus(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public int getSize() { + return size; + } + + public void setSize(int size) { + this.size = size; + } + + public long getStartTime() { + return startTime; + } + + public void setStartTime(long startTime) { + this.startTime = startTime; + } + + public long getEndTime() { + return endTime; + } + + public void setEndTime(long endTime) { + this.endTime = endTime; + } + + public void setPosition(String position) { + this.position = position; + } + + public String getPosition() { + return this.position; + } + + @Override + protected StringBuilder toString(StringBuilder sb) { + sb.append(", name=").append(name); + sb.append(", size=").append(size); + sb.append(", startTime=").append(startTime); + sb.append(", endTime=").append(endTime); + + return sb; + } +} diff --git a/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationService.java b/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationService.java index 0a2257e..48f2a2f 100644 --- a/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationService.java +++ b/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationService.java @@ -60,14 +60,14 @@ public class DataMigrationService implements Service { @Inject public DataMigrationService(GraphDBMigrator migrator, AtlasTypeDefStore typeDefStore, Configuration configuration, GraphBackedSearchIndexer indexer, AtlasTypeDefStoreInitializer storeInitializer, - AtlasTypeRegistry typeRegistry, ImportService importService) { + AtlasTypeRegistry typeRegistry, ImportService importService, DataMigrationStatusService dataMigrationStatusService) { this.configuration = configuration; String fileName = getFileName(); boolean zipFileBasedMigrationImport = StringUtils.endsWithIgnoreCase(fileName, FILE_EXTENSION_ZIP); this.thread = (zipFileBasedMigrationImport) - ? new Thread(new ZipFileMigrationImporter(importService, fileName), "zipFileBasedMigrationImporter") + ? new Thread(new ZipFileMigrationImporter(importService, fileName, dataMigrationStatusService), "zipFileBasedMigrationImporter") : new Thread(new FileImporter(migrator, typeDefStore, typeRegistry, storeInitializer, fileName, indexer)); } diff --git a/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java b/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java new file mode 100644 index 0000000..b5285d0 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.migration; + +import org.apache.atlas.annotation.AtlasService; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.migration.MigrationImportStatus; +import org.apache.atlas.repository.ogm.DataAccess; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.inject.Inject; + +@AtlasService +public class DataMigrationStatusService { + private static final Logger LOG = LoggerFactory.getLogger(DataMigrationStatusService.class); + + private final DataAccess dataAccess; + private MigrationImportStatus status; + + @Inject + public DataMigrationStatusService(DataAccess dataAccess) { + this.dataAccess = dataAccess; + } + + public MigrationImportStatus getCreate(MigrationImportStatus status) { + try { + this.status = this.dataAccess.load(status); + this.status.setSize(status.getSize()); + this.status.setStartTime(status.getStartTime()); + + this.status = dataAccess.save(this.status); + } catch (Exception ex) { + LOG.info("DataMigrationStatusService: Setting status: {}...", status.getName()); + try { + this.status = dataAccess.save(status); + } catch (AtlasBaseException e) { + LOG.info("DataMigrationStatusService: Error saving status: {}...", status.getName()); + } + } + + return this.status; + } + + public MigrationImportStatus get() { + return this.status; + } + + public MigrationImportStatus getByName(String name) throws AtlasBaseException { + MigrationImportStatus status = new MigrationImportStatus(name); + + return dataAccess.load(status); + } + + public void deleteStatus() throws AtlasBaseException { + if (this.status == null) { + return; + } + + MigrationImportStatus status = getByName(this.status.getName()); + dataAccess.delete(status.getGuid()); + } + + public void savePosition(String position) { + this.status.setPosition(position); + try { + this.dataAccess.saveNoLoad(this.status); + } catch (AtlasBaseException e) { + LOG.error("Error saving status: {}", position, e); + } + } + + public void setEndTime() { + this.status.setEndTime(System.currentTimeMillis()); + try { + this.dataAccess.saveNoLoad(this.status); + } catch (AtlasBaseException e) { + LOG.error("Error saving status: endTime", e); + } + } + + public MigrationImportStatus createGet(String fileToImport, int streamSize) { + MigrationImportStatus status = new MigrationImportStatus(fileToImport); + status.setSize(streamSize); + + return getCreate(status); + } +} diff --git a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java index 69d78cd..72ffab4 100644 --- a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java +++ b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java @@ -23,6 +23,7 @@ import org.apache.atlas.AtlasException; import org.apache.atlas.RequestContext; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.impexp.AtlasImportRequest; +import org.apache.atlas.model.migration.MigrationImportStatus; import org.apache.atlas.repository.impexp.ImportService; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; @@ -50,20 +51,23 @@ public class ZipFileMigrationImporter implements Runnable { private final ImportService importService; private final String fileToImport; + private DataMigrationStatusService dataMigrationStatusService; - public ZipFileMigrationImporter(ImportService importService, String fileName) { + public ZipFileMigrationImporter(ImportService importService, String fileName, DataMigrationStatusService dataMigrationStatusService) { this.importService = importService; this.fileToImport = fileName; + this.dataMigrationStatusService = dataMigrationStatusService; } @Override public void run() { try { - FileWatcher fileWatcher = new FileWatcher(fileToImport); - fileWatcher.start(); + detectFileToImport(); int streamSize = getStreamSizeFromComment(fileToImport); - performImport(new FileInputStream(new File(fileToImport)), streamSize); + MigrationImportStatus status = dataMigrationStatusService.createGet(fileToImport, streamSize); + performImport(new FileInputStream(new File(fileToImport)), status.getPosition(), streamSize); + dataMigrationStatusService.setEndTime(); } catch (IOException e) { LOG.error("Migration Import: IO Error!", e); } catch (AtlasBaseException e) { @@ -71,6 +75,11 @@ public class ZipFileMigrationImporter implements Runnable { } } + private void detectFileToImport() throws IOException { + FileWatcher fileWatcher = new FileWatcher(fileToImport); + fileWatcher.start(); + } + private int getStreamSizeFromComment(String fileToImport) { int ret = 1; try { @@ -96,13 +105,13 @@ public class ZipFileMigrationImporter implements Runnable { return Integer.valueOf(s); } - private void performImport(InputStream fs, int streamSize) throws AtlasBaseException { + private void performImport(InputStream fs, String position, int streamSize) throws AtlasBaseException { try { - LOG.info("Migration Import: {}: Starting...", fileToImport); + LOG.info("Migration Import: {}: Position: {}: Starting...", fileToImport, position); RequestContext.get().setUser(getUserNameFromEnvironment(), null); - importService.run(fs, getImportRequest(streamSize), + importService.run(fs, getImportRequest(streamSize, position), getUserNameFromEnvironment(), InetAddress.getLocalHost().getHostName(), InetAddress.getLocalHost().getHostAddress()); @@ -112,6 +121,7 @@ public class ZipFileMigrationImporter implements Runnable { throw new AtlasBaseException(ex); } finally { LOG.info("Migration Import: {}: Done!", fileToImport); + dataMigrationStatusService.deleteStatus(); } } @@ -119,14 +129,19 @@ public class ZipFileMigrationImporter implements Runnable { return System.getProperty(ENV_USER_NAME); } - private AtlasImportRequest getImportRequest(int streamSize) throws AtlasException { + private AtlasImportRequest getImportRequest(int streamSize, String position) throws AtlasException { AtlasImportRequest request = new AtlasImportRequest(); request.setSizeOption(streamSize); request.setOption(AtlasImportRequest.OPTION_KEY_MIGRATION, "true"); request.setOption(AtlasImportRequest.OPTION_KEY_NUM_WORKERS, getPropertyValue(APPLICATION_PROPERTY_MIGRATION_NUMER_OF_WORKERS, DEFAULT_NUMBER_OF_WORKDERS)); request.setOption(AtlasImportRequest.OPTION_KEY_BATCH_SIZE, getPropertyValue(APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE, DEFAULT_BATCH_SIZE)); - request.setOption(AtlasImportRequest.START_POSITION_KEY, Integer.toString(MIGRATION_IMPORT_START_POSITION.getInt())); + + request.setOption(AtlasImportRequest.START_POSITION_KEY, + (StringUtils.isEmpty(position) + ? Integer.toString(MIGRATION_IMPORT_START_POSITION.getInt()) + : position) + ); return request; } diff --git a/repository/src/main/java/org/apache/atlas/repository/ogm/MigrationImportStatusDTO.java b/repository/src/main/java/org/apache/atlas/repository/ogm/MigrationImportStatusDTO.java new file mode 100644 index 0000000..be541cd --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/ogm/MigrationImportStatusDTO.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.ogm; + +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.migration.MigrationImportStatus; +import org.apache.atlas.repository.Constants; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.springframework.stereotype.Component; + +import javax.inject.Inject; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +@Component +public class MigrationImportStatusDTO extends AbstractDataTransferObject<MigrationImportStatus> { + public static final String PROPERTY_NAME = "name"; + public static final String PROPERTY_SIZE = "size"; + public static final String PROPERTY_POSITION = "position"; + public static final String PROPERTY_START_TIME = "startTime"; + public static final String PROPERTY_END_TIME = "endTime"; + public static final String PROPERTY_ADDITIONAL_INFO = "additionalInfo"; + + private static final Set<String> ATTRIBUTE_NAMES = new HashSet<>(Arrays.asList(PROPERTY_NAME, + PROPERTY_SIZE, PROPERTY_POSITION, + PROPERTY_START_TIME, PROPERTY_END_TIME, + PROPERTY_ADDITIONAL_INFO)); + + @Inject + public MigrationImportStatusDTO(AtlasTypeRegistry typeRegistry) { + super(typeRegistry, MigrationImportStatus.class, Constants.INTERNAL_PROPERTY_KEY_PREFIX + MigrationImportStatus.class.getSimpleName()); + } + + public static Set<String> getAttributes() { + return ATTRIBUTE_NAMES; + } + + public static MigrationImportStatus from(String guid, Map<String,Object> attributes) { + MigrationImportStatus entry = new MigrationImportStatus(); + + entry.setGuid(guid); + entry.setName((String) attributes.get(PROPERTY_NAME)); + entry.setSize((int) attributes.get(PROPERTY_SIZE)); + entry.setPosition((String) attributes.get(PROPERTY_POSITION)); + entry.setStartTime((long) attributes.get(PROPERTY_START_TIME)); + entry.setEndTime((long) attributes.get(PROPERTY_END_TIME)); + + return entry; + } + + @Override + public MigrationImportStatus from(AtlasEntity entity) { + return from(entity.getGuid(), entity.getAttributes()); + } + + @Override + public MigrationImportStatus from(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) { + return from(entityWithExtInfo.getEntity()); + } + + @Override + public AtlasEntity toEntity(MigrationImportStatus obj) { + AtlasEntity entity = getDefaultAtlasEntity(obj); + + entity.setAttribute(PROPERTY_NAME, obj.getName()); + entity.setAttribute(PROPERTY_SIZE, obj.getSize()); + entity.setAttribute(PROPERTY_POSITION, obj.getPosition()); + entity.setAttribute(PROPERTY_START_TIME, obj.getStartTime()); + entity.setAttribute(PROPERTY_END_TIME, obj.getEndTime()); + + return entity; + } + + @Override + public AtlasEntity.AtlasEntityWithExtInfo toEntityWithExtInfo(MigrationImportStatus obj) throws AtlasBaseException { + return new AtlasEntity.AtlasEntityWithExtInfo(toEntity(obj)); + } + + @Override + public Map<String, Object> getUniqueAttributes(final MigrationImportStatus obj) { + return Collections.singletonMap(PROPERTY_NAME, obj.getName()); + } +} diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java index 4526002..72b2f4f 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java @@ -26,8 +26,8 @@ import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.repository.graph.AtlasGraphProvider; -import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.migration.DataMigrationStatusService; import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.repository.store.graph.BulkImporter; import org.apache.atlas.repository.store.graph.v2.bulkimport.ImportStrategy; @@ -53,11 +53,13 @@ public class BulkImporterImpl implements BulkImporter { private final AtlasEntityStore entityStore; private AtlasTypeRegistry typeRegistry; + private DataMigrationStatusService dataMigrationStatusService; @Inject - public BulkImporterImpl(AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) { + public BulkImporterImpl(AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry, DataMigrationStatusService dataMigrationStatusService) { this.entityStore = entityStore; this.typeRegistry = typeRegistry; + this.dataMigrationStatusService = dataMigrationStatusService; } @Override @@ -65,7 +67,7 @@ public class BulkImporterImpl implements BulkImporter { ImportStrategy importStrategy = (importResult.getRequest().getOptions() != null && importResult.getRequest().getOptions().containsKey(AtlasImportRequest.OPTION_KEY_MIGRATION)) - ? new MigrationImport(new AtlasGraphProvider(), this.typeRegistry) + ? new MigrationImport(new AtlasGraphProvider(), this.typeRegistry, dataMigrationStatusService) : new RegularImport(this.entityStore, this.typeRegistry); LOG.info("BulkImportImpl: {}", importStrategy.getClass().getSimpleName()); diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java index 8c66656..9819dc2 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java @@ -26,6 +26,7 @@ import org.apache.atlas.repository.converters.AtlasFormatConverters; import org.apache.atlas.repository.converters.AtlasInstanceConverter; import org.apache.atlas.repository.graph.AtlasGraphProvider; import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.migration.DataMigrationStatusService; import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.repository.store.graph.AtlasRelationshipStore; import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate; @@ -44,14 +45,16 @@ public class MigrationImport extends ImportStrategy { private static final Logger LOG = LoggerFactory.getLogger(MigrationImport.class); private final AtlasTypeRegistry typeRegistry; + private final DataMigrationStatusService dataMigrationStatusService; private AtlasGraph atlasGraph; private EntityGraphRetriever entityGraphRetriever; private EntityGraphMapper entityGraphMapper; private AtlasEntityStore entityStore; - public MigrationImport(AtlasGraphProvider atlasGraphProvider, AtlasTypeRegistry typeRegistry) { + public MigrationImport(AtlasGraphProvider atlasGraphProvider, AtlasTypeRegistry typeRegistry, DataMigrationStatusService dataMigrationStatusService) { this.typeRegistry = typeRegistry; setupEntityStore(atlasGraphProvider, typeRegistry); + this.dataMigrationStatusService = dataMigrationStatusService; LOG.info("MigrationImport: Using bulkLoading..."); } @@ -67,11 +70,11 @@ public class MigrationImport extends ImportStrategy { int index = 0; int streamSize = entityStream.size(); EntityMutationResponse ret = new EntityMutationResponse(); - EntityCreationManager creationManager = createEntityCreationManager(atlasGraph, importResult, streamSize); + EntityCreationManager creationManager = createEntityCreationManager(atlasGraph, dataMigrationStatusService, importResult, streamSize); try { LOG.info("Migration Import: Size: {}: Starting...", streamSize); - index = creationManager.read(entityStream); + index = creationManager.read(entityStream, importResult.getRequest().getStartPosition()); creationManager.drain(); creationManager.extractResults(); } catch (Exception ex) { @@ -84,14 +87,14 @@ public class MigrationImport extends ImportStrategy { return ret; } - private EntityCreationManager createEntityCreationManager(AtlasGraph threadedAtlasGraph, AtlasImportResult importResult, int streamSize) { + private EntityCreationManager createEntityCreationManager(AtlasGraph threadedAtlasGraph, DataMigrationStatusService dataMigrationStatusService, AtlasImportResult importResult, int streamSize) { int batchSize = importResult.getRequest().getOptionKeyBatchSize(); int numWorkers = getNumWorkers(importResult.getRequest().getOptionKeyNumWorkers()); EntityConsumerBuilder consumerBuilder = new EntityConsumerBuilder(threadedAtlasGraph, entityStore, entityGraphRetriever, typeRegistry, batchSize); - return new EntityCreationManager(consumerBuilder, batchSize, numWorkers, importResult, streamSize); + return new EntityCreationManager(consumerBuilder, batchSize, numWorkers, dataMigrationStatusService, importResult, streamSize); } private static int getNumWorkers(int numWorkersFromOptions) { diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java index 0051941..89c5429 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java @@ -22,6 +22,7 @@ import org.apache.atlas.model.impexp.AtlasImportResult; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.pc.WorkItemBuilder; import org.apache.atlas.pc.WorkItemManager; +import org.apache.atlas.repository.migration.DataMigrationStatusService; import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl; import org.apache.atlas.repository.store.graph.v2.EntityImportStream; import org.apache.commons.lang.StringUtils; @@ -30,25 +31,27 @@ import org.slf4j.LoggerFactory; public class EntityCreationManager<AtlasEntityWithExtInfo> extends WorkItemManager { private static final Logger LOG = LoggerFactory.getLogger(EntityCreationManager.class); + private static final long STATUS_REPORT_TIMEOUT_DURATION = 5 * 60 * 1000; // 5 min private static final String WORKER_PREFIX = "migration-import"; private final StatusReporter<String, String> statusReporter; + private final DataMigrationStatusService dataMigrationStatusService; private final AtlasImportResult importResult; private final int streamSize; - private final long STATUS_REPORT_TIMEOUT_DURATION = 5 * 60 * 1000; // 5 min private String currentTypeName; private float currentPercent; - public EntityCreationManager(WorkItemBuilder builder, int batchSize, int numWorkers, AtlasImportResult importResult, int streamSize) { + public EntityCreationManager(WorkItemBuilder builder, int batchSize, int numWorkers, DataMigrationStatusService dataMigrationStatusService, AtlasImportResult importResult, int streamSize) { super(builder, WORKER_PREFIX, batchSize, numWorkers, true); + this.dataMigrationStatusService = dataMigrationStatusService; this.importResult = importResult; this.streamSize = streamSize; this.statusReporter = new StatusReporter<>(STATUS_REPORT_TIMEOUT_DURATION); } - public int read(EntityImportStream entityStream) { - int currentIndex = 0; + public int read(EntityImportStream entityStream, String startPosition) { + int currentIndex = StringUtils.isEmpty(startPosition) ? 0 : Integer.valueOf(startPosition); AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo; while ((entityWithExtInfo = entityStream.getNextEntityWithExtInfo()) != null) { AtlasEntity entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null; @@ -103,8 +106,10 @@ public class EntityCreationManager<AtlasEntityWithExtInfo> extends WorkItemManag return; } + String currentPosition = split[1]; + dataMigrationStatusService.savePosition(currentPosition); importResult.incrementMeticsCounter(split[0]); - this.currentPercent = updateImportMetrics(split[0], Integer.parseInt(split[1]), getStreamSize(), getCurrentPercent()); + this.currentPercent = updateImportMetrics(split[0], Integer.parseInt(currentPosition), getStreamSize(), getCurrentPercent()); } private static float updateImportMetrics(String typeNameGuid, int currentIndex, int streamSize, float currentPercent) {
