This is an automated email from the ASF dual-hosted git repository. madhan pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/atlas.git
commit 1678b9b5b6bc8bad376776388f2b234e3126bedb Author: Ashutosh Mestry <[email protected]> AuthorDate: Thu Mar 5 16:06:56 2020 -0800 ATLAS-3320: Migration Import implementation. (cherry picked from commit 765ea583b23d25ecc919d520092747c4158466a5) --- .../repository/graphdb/janus/AtlasJanusGraph.java | 2 +- .../atlas/model/impexp/AtlasImportRequest.java | 21 ++ .../apache/atlas/GraphTransactionInterceptor.java | 4 + .../atlas/repository/graph/FullTextMapperV2.java | 9 +- .../atlas/repository/graph/IFullTextMapper.java | 45 +++++ .../atlas/repository/impexp/ImportService.java | 5 +- .../atlas/repository/impexp/ZipSourceDirect.java | 8 + .../migration/ZipFileMigrationImporter.java | 61 +++++- .../repository/store/graph/AtlasEntityStore.java | 8 + .../store/graph/v2/AtlasEntityStoreV2.java | 9 +- .../store/graph/v2/AtlasRelationshipStoreV2.java | 4 +- .../store/graph/v2/BulkImporterImpl.java | 224 +++++---------------- .../store/graph/v2/EntityGraphMapper.java | 10 +- .../graph/v2/bulkimport/FullTextMapperV2Nop.java | 57 ++++++ .../store/graph/v2/bulkimport/ImportStrategy.java | 28 +++ .../store/graph/v2/bulkimport/MigrationImport.java | 124 ++++++++++++ .../RegularImport.java} | 77 +++---- .../graph/v2/bulkimport/pc/EntityConsumer.java | 209 +++++++++++++++++++ .../v2/bulkimport/pc/EntityConsumerBuilder.java | 50 +++++ .../v2/bulkimport/pc/EntityCreationManager.java | 126 ++++++++++++ .../test/java/org/apache/atlas/TestModules.java | 5 + 21 files changed, 833 insertions(+), 253 deletions(-) diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java index 4acb371..0176ba7 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java @@ -116,7 +116,7 @@ public class AtlasJanusGraph implements AtlasGraph<AtlasJanusVertex, AtlasJanusE } } - janusGraph = (StandardJanusGraph) AtlasJanusGraphDatabase.getGraphInstance(); + janusGraph = (StandardJanusGraph) graphInstance; } @Override diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java index 3362bf1..09dafdf 100644 --- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java +++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java @@ -44,6 +44,9 @@ public class AtlasImportRequest implements Serializable { public static final String TRANSFORMS_KEY = "transforms"; public static final String TRANSFORMERS_KEY = "transformers"; public static final String OPTION_KEY_REPLICATED_FROM = "replicatedFrom"; + public static final String OPTION_KEY_MIGRATION = "migration"; + public static final String OPTION_KEY_NUM_WORKERS = "numWorkers"; + public static final String OPTION_KEY_BATCH_SIZE = "batchSize"; public static final String OPTION_KEY_FORMAT = "format"; public static final String OPTION_KEY_FORMAT_ZIP_DIRECT = "zipDirect"; private static final String START_POSITION_KEY = "startPosition"; @@ -124,6 +127,24 @@ public class AtlasImportRequest implements Serializable { return isReplicationOptionSet() ? options.get(OPTION_KEY_REPLICATED_FROM) : StringUtils.EMPTY; } + @JsonIgnore + public int getOptionKeyNumWorkers() { + return getOptionsValue(OPTION_KEY_NUM_WORKERS, 1); + } + + @JsonIgnore + public int getOptionKeyBatchSize() { + return getOptionsValue(OPTION_KEY_BATCH_SIZE, 1); + } + + private int getOptionsValue(String optionKeyBatchSize, int defaultValue) { + String optionsValue = getOptionForKey(optionKeyBatchSize); + + return StringUtils.isEmpty(optionsValue) ? + defaultValue : + Integer.valueOf(optionsValue); + } + @JsonAnySetter public void setOption(String key, String value) { if (null == options) { diff --git a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java index bbe0dc5..57e454a 100644 --- a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java +++ b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java @@ -199,6 +199,10 @@ public class GraphTransactionInterceptor implements MethodInterceptor { return cache.get(guid); } + public static void clearCache() { + guidVertexCache.get().clear(); + } + boolean logException(Throwable t) { if (t instanceof AtlasBaseException) { Response.Status httpCode = ((AtlasBaseException) t).getAtlasErrorCode().getHttpCode(); diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapperV2.java b/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapperV2.java index 0f2b4bf..417c96c 100644 --- a/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapperV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapperV2.java @@ -53,7 +53,7 @@ import java.util.Set; @Component -public class FullTextMapperV2 { +public class FullTextMapperV2 implements IFullTextMapper { private static final Logger LOG = LoggerFactory.getLogger(FullTextMapperV2.class); private static final String FULL_TEXT_DELIMITER = " "; @@ -84,6 +84,8 @@ public class FullTextMapperV2 { * @return Full text string ONLY for the added classifications * @throws AtlasBaseException */ + + @Override public String getIndexTextForClassifications(String guid, List<AtlasClassification> classifications) throws AtlasBaseException { String ret = null; final AtlasEntityWithExtInfo entityWithExtInfo; @@ -120,6 +122,7 @@ public class FullTextMapperV2 { return ret; } + @Override public String getIndexTextForEntity(String guid) throws AtlasBaseException { String ret = null; final AtlasEntity entity; @@ -150,6 +153,7 @@ public class FullTextMapperV2 { return ret; } + @Override public String getClassificationTextForEntity(AtlasEntity entity) throws AtlasBaseException { String ret = null; @@ -271,10 +275,12 @@ public class FullTextMapperV2 { } } + @Override public AtlasEntity getAndCacheEntity(String guid) throws AtlasBaseException { return getAndCacheEntity(guid, true); } + @Override public AtlasEntity getAndCacheEntity(String guid, boolean includeReferences) throws AtlasBaseException { RequestContext context = RequestContext.get(); AtlasEntity entity = context.getEntity(guid); @@ -294,6 +300,7 @@ public class FullTextMapperV2 { return entity; } + @Override public AtlasEntityWithExtInfo getAndCacheEntityWithExtInfo(String guid) throws AtlasBaseException { RequestContext context = RequestContext.get(); AtlasEntityWithExtInfo entityWithExtInfo = context.getEntityWithExtInfo(guid); diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/IFullTextMapper.java b/repository/src/main/java/org/apache/atlas/repository/graph/IFullTextMapper.java new file mode 100644 index 0000000..2bbf4d8 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/graph/IFullTextMapper.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.graph; + +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.AtlasClassification; +import org.apache.atlas.model.instance.AtlasEntity; + +import java.util.List; + +public interface IFullTextMapper { + /** + * Map newly associated/defined classifications for the entity with given GUID + * @param guid Entity guid + * @param classifications new classifications added to the entity + * @return Full text string ONLY for the added classifications + * @throws AtlasBaseException + */ + String getIndexTextForClassifications(String guid, List<AtlasClassification> classifications) throws AtlasBaseException; + + String getIndexTextForEntity(String guid) throws AtlasBaseException; + + String getClassificationTextForEntity(AtlasEntity entity) throws AtlasBaseException; + + AtlasEntity getAndCacheEntity(String guid) throws AtlasBaseException; + + AtlasEntity getAndCacheEntity(String guid, boolean includeReferences) throws AtlasBaseException; + + AtlasEntity.AtlasEntityWithExtInfo getAndCacheEntityWithExtInfo(String guid) throws AtlasBaseException; +} diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java index 1964ade..c18c4ab 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java @@ -250,8 +250,9 @@ public class ImportService { private EntityImportStream createZipSource(AtlasImportRequest request, InputStream inputStream, String configuredTemporaryDirectory) throws AtlasBaseException { try { - if (request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_FORMAT) && - request.getOptions().get(AtlasImportRequest.OPTION_KEY_FORMAT).equals(AtlasImportRequest.OPTION_KEY_FORMAT_ZIP_DIRECT) ) { + if (request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_MIGRATION) || (request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_FORMAT) && + request.getOptions().get(AtlasImportRequest.OPTION_KEY_FORMAT).equals(AtlasImportRequest.OPTION_KEY_FORMAT_ZIP_DIRECT))) { + LOG.info("ZipSource Format: ZipDirect: Size: {}", request.getOptions().get("size")); return getZipDirectEntityImportStream(request, inputStream); } diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java index cb5a7ac..75b8e9e 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java @@ -64,6 +64,10 @@ public class ZipSourceDirect implements EntityImportStream { this.zipInputStream = new ZipInputStream(inputStream); this.streamSize = streamSize; prepareStreamForFetch(); + + if (this.streamSize == 1) { + LOG.info("ZipSourceDirect: Stream Size set to: {}. This will cause inaccurate percentage reporting.", this.streamSize); + } } @Override @@ -226,6 +230,10 @@ public class ZipSourceDirect implements EntityImportStream { } public int size() { + if (this.streamSize == 1) { + return currentPosition; + } + return this.streamSize; } diff --git a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java index f552525..35a76ea 100644 --- a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java +++ b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java @@ -24,6 +24,8 @@ import org.apache.atlas.RequestContext; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.impexp.AtlasImportRequest; import org.apache.atlas.repository.impexp.ImportService; +import org.apache.atlas.type.AtlasType; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,11 +34,20 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.net.InetAddress; +import java.util.Map; +import java.util.zip.ZipFile; public class ZipFileMigrationImporter implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(ZipFileMigrationImporter.class); - private static String ENV_USER_NAME = "user.name"; + private static final String APPLICATION_PROPERTY_MIGRATION_NUMER_OF_WORKERS = "atlas.migration.mode.workers"; + private static final String APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE = "atlas.migration.mode.batch.size"; + private static final String DEFAULT_NUMBER_OF_WORKERS = "4"; + private static final String DEFAULT_BATCH_SIZE = "100"; + private static final String ZIP_FILE_COMMENT_ENTITIES_COUNT = "entitiesCount"; + private static final String ZIP_FILE_COMMENT_TOTAL_COUNT = "total"; + + private final static String ENV_USER_NAME = "user.name"; private final ImportService importService; private final String fileToImport; @@ -52,7 +63,8 @@ public class ZipFileMigrationImporter implements Runnable { FileWatcher fileWatcher = new FileWatcher(fileToImport); fileWatcher.start(); - performImport(new FileInputStream(new File(fileToImport))); + int streamSize = getStreamSizeFromComment(fileToImport); + performImport(new FileInputStream(new File(fileToImport)), streamSize); } catch (IOException e) { LOG.error("Migration Import: IO Error!", e); } catch (AtlasBaseException e) { @@ -60,19 +72,46 @@ public class ZipFileMigrationImporter implements Runnable { } } - private void performImport(InputStream fs) throws AtlasBaseException { + private int getStreamSizeFromComment(String fileToImport) { + int ret = 1; + try { + ZipFile zipFile = new ZipFile(fileToImport); + String comment = zipFile.getComment(); + ret = processZipFileStreamSizeComment(comment); + zipFile.close(); + } catch (IOException e) { + LOG.error("Error opening ZIP file: {}", fileToImport, e); + } + + return ret; + } + + private int processZipFileStreamSizeComment(String comment) { + if (StringUtils.isEmpty(comment)) { + return 1; + } + + Map map = AtlasType.fromJson(comment, Map.class); + int entitiesCount = (int) map.get(ZIP_FILE_COMMENT_ENTITIES_COUNT); + int totalCount = (int) map.get(ZIP_FILE_COMMENT_TOTAL_COUNT); + LOG.info("ZipFileMigrationImporter: Zip file: Comment: streamSize: {}: total: {}", entitiesCount, totalCount); + + return entitiesCount; + } + + private void performImport(InputStream fs, int streamSize) throws AtlasBaseException { try { LOG.info("Migration Import: {}: Starting...", fileToImport); RequestContext.get().setUser(getUserNameFromEnvironment(), null); - importService.run(fs, getImportRequest(), + importService.run(fs, getImportRequest(streamSize), getUserNameFromEnvironment(), InetAddress.getLocalHost().getHostName(), InetAddress.getLocalHost().getHostAddress()); } catch (Exception ex) { - LOG.error("Error loading zip for migration", ex); + LOG.error("Migration Import: Error loading zip for migration!", ex); throw new AtlasBaseException(ex); } finally { LOG.info("Migration Import: {}: Done!", fileToImport); @@ -83,9 +122,17 @@ public class ZipFileMigrationImporter implements Runnable { return System.getProperty(ENV_USER_NAME); } - private AtlasImportRequest getImportRequest() throws AtlasException { + private AtlasImportRequest getImportRequest(int streamSize) throws AtlasException { AtlasImportRequest request = new AtlasImportRequest(); - request.setOption(AtlasImportRequest.OPTION_KEY_FORMAT, AtlasImportRequest.OPTION_KEY_FORMAT_ZIP_DIRECT); + + request.setSizeOption(streamSize); + request.setOption(AtlasImportRequest.OPTION_KEY_MIGRATION, "true"); + request.setOption(AtlasImportRequest.OPTION_KEY_NUM_WORKERS, getPropertyValue(APPLICATION_PROPERTY_MIGRATION_NUMER_OF_WORKERS, DEFAULT_NUMBER_OF_WORKERS)); + request.setOption(AtlasImportRequest.OPTION_KEY_BATCH_SIZE, getPropertyValue(APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE, DEFAULT_BATCH_SIZE)); + return request; } + private String getPropertyValue(String property, String defaultValue) throws AtlasException { + return ApplicationProperties.get().getString(property, defaultValue); + } } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java index b4b76f2..7b9455e 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java @@ -152,6 +152,14 @@ public interface AtlasEntityStore { EntityMutationResponse createOrUpdateForImport(EntityStream entityStream) throws AtlasBaseException; /** + * Create or update entities with parameters necessary for import process without commit. Caller will have to do take care of commit. + * @param entityStream AtlasEntityStream + * @return EntityMutationResponse Entity mutations operations with the corresponding set of entities on which these operations were performed + * @throws AtlasBaseException + */ + EntityMutationResponse createOrUpdateForImportNoCommit(EntityStream entityStream) throws AtlasBaseException; + + /** * Update a single entity * @param objectId ID of the entity * @param updatedEntityInfo updated entity information diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java index 118acfa..379150b 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java @@ -103,14 +103,14 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore { private final DeleteHandlerDelegate deleteDelegate; private final AtlasTypeRegistry typeRegistry; - private final AtlasEntityChangeNotifier entityChangeNotifier; + private final IAtlasEntityChangeNotifier entityChangeNotifier; private final EntityGraphMapper entityGraphMapper; private final EntityGraphRetriever entityRetriever; @Inject public AtlasEntityStoreV2(DeleteHandlerDelegate deleteDelegate, AtlasTypeRegistry typeRegistry, - AtlasEntityChangeNotifier entityChangeNotifier, EntityGraphMapper entityGraphMapper) { + IAtlasEntityChangeNotifier entityChangeNotifier, EntityGraphMapper entityGraphMapper) { this.deleteDelegate = deleteDelegate; this.typeRegistry = typeRegistry; this.entityChangeNotifier = entityChangeNotifier; @@ -346,6 +346,11 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore { } @Override + public EntityMutationResponse createOrUpdateForImportNoCommit(EntityStream entityStream) throws AtlasBaseException { + return createOrUpdate(entityStream, false, true, true); + } + + @Override @GraphTransaction public EntityMutationResponse updateEntity(AtlasObjectId objectId, AtlasEntityWithExtInfo updatedEntityInfo, boolean isPartialUpdate) throws AtlasBaseException { if (LOG.isDebugEnabled()) { diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java index 6b5615f..7425ac6 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java @@ -99,10 +99,10 @@ public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore { private final EntityGraphRetriever entityRetriever; private final DeleteHandlerDelegate deleteDelegate; private final GraphHelper graphHelper = GraphHelper.getInstance(); - private final AtlasEntityChangeNotifier entityChangeNotifier; + private final IAtlasEntityChangeNotifier entityChangeNotifier; @Inject - public AtlasRelationshipStoreV2(AtlasTypeRegistry typeRegistry, DeleteHandlerDelegate deleteDelegate, AtlasEntityChangeNotifier entityChangeNotifier) { + public AtlasRelationshipStoreV2(AtlasTypeRegistry typeRegistry, DeleteHandlerDelegate deleteDelegate, IAtlasEntityChangeNotifier entityChangeNotifier) { this.typeRegistry = typeRegistry; this.entityRetriever = new EntityGraphRetriever(typeRegistry); this.deleteDelegate = deleteDelegate; diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java index 54c32c5..a4d732a 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java @@ -18,33 +18,29 @@ package org.apache.atlas.repository.store.graph.v2; import com.google.common.annotations.VisibleForTesting; -import org.apache.atlas.AtlasConfiguration; -import org.apache.atlas.AtlasErrorCode; -import org.apache.atlas.RequestContext; -import org.apache.atlas.annotation.GraphTransaction; import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.impexp.AtlasImportRequest; import org.apache.atlas.model.impexp.AtlasImportResult; import org.apache.atlas.model.instance.AtlasEntity; -import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.EntityMutationResponse; -import org.apache.atlas.repository.Constants; -import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException; +import org.apache.atlas.repository.graph.AtlasGraphProvider; import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.repository.store.graph.BulkImporter; +import org.apache.atlas.repository.store.graph.v2.bulkimport.ImportStrategy; +import org.apache.atlas.repository.store.graph.v2.bulkimport.MigrationImport; +import org.apache.atlas.repository.store.graph.v2.bulkimport.RegularImport; import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.type.Constants; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import javax.inject.Inject; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Set; @@ -55,131 +51,27 @@ public class BulkImporterImpl implements BulkImporter { private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV2.class); private final AtlasEntityStore entityStore; - private final EntityGraphRetriever entityGraphRetriever; private AtlasTypeRegistry typeRegistry; - private final int MAX_ATTEMPTS = 2; - private boolean directoryBasedImportConfigured; @Inject public BulkImporterImpl(AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) { this.entityStore = entityStore; - this.entityGraphRetriever = new EntityGraphRetriever(typeRegistry); this.typeRegistry = typeRegistry; - this.directoryBasedImportConfigured = StringUtils.isNotEmpty(AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString()); } @Override public EntityMutationResponse bulkImport(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException { - if (LOG.isDebugEnabled()) { - LOG.debug("==> bulkImport()"); - } - - if (entityStream == null || !entityStream.hasNext()) { - throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entities to create/update."); - } - - EntityMutationResponse ret = new EntityMutationResponse(); - ret.setGuidAssignments(new HashMap<>()); - - Set<String> processedGuids = new HashSet<>(); - float currentPercent = 0f; - List<String> residualList = new ArrayList<>(); - - EntityImportStreamWithResidualList entityImportStreamWithResidualList = new EntityImportStreamWithResidualList(entityStream, residualList); - - while (entityImportStreamWithResidualList.hasNext()) { - AtlasEntityWithExtInfo entityWithExtInfo = entityImportStreamWithResidualList.getNextEntityWithExtInfo(); - AtlasEntity entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null; - - if (entity == null) { - continue; - } - - for (int attempt = 0; attempt < MAX_ATTEMPTS; attempt++) { - try { - AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, null); - EntityMutationResponse resp = entityStore.createOrUpdateForImport(oneEntityStream); - - if (resp.getGuidAssignments() != null) { - ret.getGuidAssignments().putAll(resp.getGuidAssignments()); - } + ImportStrategy importStrategy = null; - currentPercent = updateImportMetrics(entityWithExtInfo, resp, importResult, processedGuids, - entityStream.getPosition(), - entityImportStreamWithResidualList.getStreamSize(), - currentPercent); - - entityStream.onImportComplete(entity.getGuid()); - break; - } catch (AtlasBaseException e) { - if (!updateResidualList(e, residualList, entityWithExtInfo.getEntity().getGuid())) { - throw e; - } - break; - } catch (AtlasSchemaViolationException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Entity: {}", entity.getGuid(), e); - } - - if (attempt == 0) { - updateVertexGuid(entity); - } else { - LOG.error("Guid update failed: {}", entityWithExtInfo.getEntity().getGuid()); - throw e; - } - } catch (Throwable e) { - AtlasBaseException abe = new AtlasBaseException(e); - if (!updateResidualList(abe, residualList, entityWithExtInfo.getEntity().getGuid())) { - throw abe; - } - - LOG.warn("Exception: {}", entity.getGuid(), e); - break; - } finally { - RequestContext.get().clearCache(); - } - } - } - - importResult.getProcessedEntities().addAll(processedGuids); - LOG.info("bulkImport(): done. Total number of entities (including referred entities) imported: {}", processedGuids.size()); - - return ret; - } - - @GraphTransaction - public void updateVertexGuid(AtlasEntity entity) { - String entityGuid = entity.getGuid(); - AtlasObjectId objectId = entityGraphRetriever.toAtlasObjectIdWithoutGuid(entity); - - AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName()); - String vertexGuid = null; - try { - vertexGuid = AtlasGraphUtilsV2.getGuidByUniqueAttributes(entityType, objectId.getUniqueAttributes()); - } catch (AtlasBaseException e) { - LOG.warn("Entity: {}: Does not exist!", objectId); - return; - } - - if (StringUtils.isEmpty(vertexGuid) || vertexGuid.equals(entityGuid)) { - return; - } - - AtlasVertex v = AtlasGraphUtilsV2.findByGuid(vertexGuid); - if (v == null) { - return; + if (importResult.getRequest().getOptions() != null && + importResult.getRequest().getOptions().containsKey(AtlasImportRequest.OPTION_KEY_MIGRATION)) { + importStrategy = new MigrationImport(new AtlasGraphProvider(), this.typeRegistry); + } else { + importStrategy = new RegularImport(this.entityStore, this.typeRegistry); } - addHistoricalGuid(v, vertexGuid); - AtlasGraphUtilsV2.setProperty(v, Constants.GUID_PROPERTY_KEY, entityGuid); - - LOG.warn("GUID Updated: Entity: {}: from: {}: to: {}", objectId, vertexGuid, entity.getGuid()); - } - - private void addHistoricalGuid(AtlasVertex v, String vertexGuid) { - String existingJson = AtlasGraphUtilsV2.getProperty(v, HISTORICAL_GUID_PROPERTY_KEY, String.class); - - AtlasGraphUtilsV2.setProperty(v, HISTORICAL_GUID_PROPERTY_KEY, getJsonArray(existingJson, vertexGuid)); + LOG.info("BulkImportImpl: {}", importStrategy.getClass().getSimpleName()); + return importStrategy.run(entityStream, importResult); } @VisibleForTesting @@ -193,38 +85,16 @@ public class BulkImporterImpl implements BulkImporter { return json; } - private boolean updateResidualList(AtlasBaseException e, List<String> lineageList, String guid) { - if (!e.getAtlasErrorCode().getErrorCode().equals(AtlasErrorCode.INVALID_OBJECT_ID.getErrorCode())) { - return false; - } - - lineageList.add(guid); - - return true; - } - - private float updateImportMetrics(AtlasEntity.AtlasEntityWithExtInfo currentEntity, - EntityMutationResponse resp, - AtlasImportResult importResult, - Set<String> processedGuids, - int currentIndex, int streamSize, float currentPercent) { - if (!directoryBasedImportConfigured) { - updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult); - updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult); - updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult); - } - - String lastEntityImported = String.format("entity:last-imported:%s:[%s]:(%s)", currentEntity.getEntity().getTypeName(), currentIndex, currentEntity.getEntity().getGuid()); - - return updateImportProgress(LOG, currentIndex, streamSize, currentPercent, lastEntityImported); - } - @VisibleForTesting - static float updateImportProgress(Logger log, int currentIndex, int streamSize, float currentPercent, String additionalInfo) { + public static float updateImportProgress(Logger log, int currentIndex, int streamSize, float currentPercent, String additionalInfo) { final double tolerance = 0.000001; final int MAX_PERCENT = 100; int maxSize = (currentIndex <= streamSize) ? streamSize : currentIndex; + if (maxSize <= 0) { + return currentPercent; + } + float percent = (float) ((currentIndex * MAX_PERCENT) / maxSize); boolean updateLog = Double.compare(percent, currentPercent) > tolerance; float updatedPercent = (MAX_PERCENT < maxSize) ? percent : ((updateLog) ? ++currentPercent : currentPercent); @@ -236,7 +106,7 @@ public class BulkImporterImpl implements BulkImporter { return updatedPercent; } - private static void updateImportMetrics(String prefix, List<AtlasEntityHeader> list, Set<String> processedGuids, AtlasImportResult importResult) { + public static void updateImportMetrics(String prefix, List<AtlasEntityHeader> list, Set<String> processedGuids, AtlasImportResult importResult) { if (list == null) { return; } @@ -251,41 +121,37 @@ public class BulkImporterImpl implements BulkImporter { } } - private static class EntityImportStreamWithResidualList { - private final EntityImportStream stream; - private final List<String> residualList; - private boolean navigateResidualList; - private int currentResidualListIndex; - + public static void updateVertexGuid(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityGraphRetriever, AtlasEntity entity) { + String entityGuid = entity.getGuid(); + AtlasObjectId objectId = entityGraphRetriever.toAtlasObjectIdWithoutGuid(entity); - public EntityImportStreamWithResidualList(EntityImportStream stream, List<String> residualList) { - this.stream = stream; - this.residualList = residualList; - this.navigateResidualList = false; - this.currentResidualListIndex = 0; + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName()); + String vertexGuid = null; + try { + vertexGuid = AtlasGraphUtilsV2.getGuidByUniqueAttributes(entityType, objectId.getUniqueAttributes()); + } catch (AtlasBaseException e) { + LOG.warn("Entity: {}: Does not exist!", objectId); + return; } - public AtlasEntity.AtlasEntityWithExtInfo getNextEntityWithExtInfo() { - if (navigateResidualList == false) { - return stream.getNextEntityWithExtInfo(); - } else { - stream.setPositionUsingEntityGuid(residualList.get(currentResidualListIndex++)); - return stream.getNextEntityWithExtInfo(); - } + if (StringUtils.isEmpty(vertexGuid) || vertexGuid.equals(entityGuid)) { + return; } - public boolean hasNext() { - if (!navigateResidualList) { - boolean streamHasNext = stream.hasNext(); - navigateResidualList = (streamHasNext == false); - return streamHasNext ? streamHasNext : (currentResidualListIndex < residualList.size()); - } else { - return (currentResidualListIndex < residualList.size()); - } + AtlasVertex v = AtlasGraphUtilsV2.findByGuid(vertexGuid); + if (v == null) { + return; } - public int getStreamSize() { - return stream.size() + residualList.size(); - } + addHistoricalGuid(v, vertexGuid); + AtlasGraphUtilsV2.setProperty(v, Constants.GUID_PROPERTY_KEY, entityGuid); + + LOG.warn("GUID Updated: Entity: {}: from: {}: to: {}", objectId, vertexGuid, entity.getGuid()); + } + + public static void addHistoricalGuid(AtlasVertex v, String vertexGuid) { + String existingJson = AtlasGraphUtilsV2.getProperty(v, HISTORICAL_GUID_PROPERTY_KEY, String.class); + + AtlasGraphUtilsV2.setProperty(v, HISTORICAL_GUID_PROPERTY_KEY, getJsonArray(existingJson, vertexGuid)); } } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java index 779de2a..d949ed0 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java @@ -39,7 +39,7 @@ import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinali import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.RepositoryException; import org.apache.atlas.repository.converters.AtlasInstanceConverter; -import org.apache.atlas.repository.graph.FullTextMapperV2; +import org.apache.atlas.repository.graph.IFullTextMapper; import org.apache.atlas.repository.graph.GraphHelper; import org.apache.atlas.repository.graphdb.AtlasEdge; import org.apache.atlas.repository.graphdb.AtlasEdgeDirection; @@ -133,15 +133,15 @@ public class EntityGraphMapper { private final DeleteHandlerDelegate deleteDelegate; private final AtlasTypeRegistry typeRegistry; private final AtlasRelationshipStore relationshipStore; - private final AtlasEntityChangeNotifier entityChangeNotifier; + private final IAtlasEntityChangeNotifier entityChangeNotifier; private final AtlasInstanceConverter instanceConverter; private final EntityGraphRetriever entityRetriever; - private final FullTextMapperV2 fullTextMapperV2; + private final IFullTextMapper fullTextMapperV2; @Inject public EntityGraphMapper(DeleteHandlerDelegate deleteDelegate, AtlasTypeRegistry typeRegistry, AtlasGraph atlasGraph, - AtlasRelationshipStore relationshipStore, AtlasEntityChangeNotifier entityChangeNotifier, - AtlasInstanceConverter instanceConverter, FullTextMapperV2 fullTextMapperV2) { + AtlasRelationshipStore relationshipStore, IAtlasEntityChangeNotifier entityChangeNotifier, + AtlasInstanceConverter instanceConverter, IFullTextMapper fullTextMapperV2) { this.deleteDelegate = deleteDelegate; this.typeRegistry = typeRegistry; this.graph = atlasGraph; diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/FullTextMapperV2Nop.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/FullTextMapperV2Nop.java new file mode 100644 index 0000000..7b7e543 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/FullTextMapperV2Nop.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v2.bulkimport; + +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.AtlasClassification; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.repository.graph.IFullTextMapper; + +import java.util.List; + +public class FullTextMapperV2Nop implements IFullTextMapper { + @Override + public String getIndexTextForClassifications(String guid, List<AtlasClassification> classifications) throws AtlasBaseException { + return null; + } + + @Override + public String getIndexTextForEntity(String guid) throws AtlasBaseException { + return null; + } + + @Override + public String getClassificationTextForEntity(AtlasEntity entity) throws AtlasBaseException { + return null; + } + + @Override + public AtlasEntity getAndCacheEntity(String guid) throws AtlasBaseException { + return null; + } + + @Override + public AtlasEntity getAndCacheEntity(String guid, boolean includeReferences) throws AtlasBaseException { + return null; + } + + @Override + public AtlasEntity.AtlasEntityWithExtInfo getAndCacheEntityWithExtInfo(String guid) throws AtlasBaseException { + return null; + } +} diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/ImportStrategy.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/ImportStrategy.java new file mode 100644 index 0000000..6b70eab --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/ImportStrategy.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.store.graph.v2.bulkimport; + +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.impexp.AtlasImportResult; +import org.apache.atlas.model.instance.EntityMutationResponse; +import org.apache.atlas.repository.store.graph.v2.EntityImportStream; + +public abstract class ImportStrategy { + public abstract EntityMutationResponse run(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException; +} diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java new file mode 100644 index 0000000..4c912fd --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.store.graph.v2.bulkimport; + +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.impexp.AtlasImportResult; +import org.apache.atlas.model.instance.EntityMutationResponse; +import org.apache.atlas.repository.converters.AtlasFormatConverters; +import org.apache.atlas.repository.converters.AtlasInstanceConverter; +import org.apache.atlas.repository.graph.AtlasGraphProvider; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.store.graph.AtlasEntityStore; +import org.apache.atlas.repository.store.graph.AtlasRelationshipStore; +import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate; +import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2; +import org.apache.atlas.repository.store.graph.v2.AtlasRelationshipStoreV2; +import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper; +import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; +import org.apache.atlas.repository.store.graph.v2.EntityImportStream; +import org.apache.atlas.repository.store.graph.v2.IAtlasEntityChangeNotifier; +import org.apache.atlas.repository.store.graph.v2.bulkimport.pc.EntityConsumerBuilder; +import org.apache.atlas.repository.store.graph.v2.bulkimport.pc.EntityCreationManager; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MigrationImport extends ImportStrategy { + private static final Logger LOG = LoggerFactory.getLogger(MigrationImport.class); + + private final AtlasTypeRegistry typeRegistry; + private AtlasGraph atlasGraph; + private EntityGraphRetriever entityGraphRetriever; + private EntityGraphMapper entityGraphMapper; + private AtlasEntityStore entityStore; + + public MigrationImport(AtlasGraphProvider atlasGraphProvider, AtlasTypeRegistry typeRegistry) { + this.typeRegistry = typeRegistry; + setupEntityStore(atlasGraphProvider, typeRegistry); + LOG.info("MigrationImport: Using bulkLoading..."); + } + + public EntityMutationResponse run(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException { + if (entityStream == null || !entityStream.hasNext()) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entities to create/update."); + } + + if (importResult.getRequest() == null) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "importResult should contain request"); + } + + int index = 0; + int streamSize = entityStream.size(); + EntityMutationResponse ret = new EntityMutationResponse(); + EntityCreationManager creationManager = createEntityCreationManager(atlasGraph, importResult); + + try { + LOG.info("Migration Import: Size: {}: Starting...", streamSize); + index = creationManager.read(entityStream); + creationManager.drain(); + creationManager.extractResults(); + } catch (Exception ex) { + LOG.error("Migration Import: Error: Current position: {}", index, ex); + } finally { + shutdownEntityCreationManager(creationManager); + } + + LOG.info("Migration Import: Size: {}: Done!", streamSize); + return ret; + } + + private EntityCreationManager createEntityCreationManager(AtlasGraph threadedAtlasGraph, AtlasImportResult importResult) { + int batchSize = importResult.getRequest().getOptionKeyBatchSize(); + int numWorkers = getNumWorkers(importResult.getRequest().getOptionKeyNumWorkers()); + + EntityConsumerBuilder consumerBuilder = + new EntityConsumerBuilder(threadedAtlasGraph, entityStore, entityGraphRetriever, typeRegistry, batchSize); + + return new EntityCreationManager(consumerBuilder, batchSize, numWorkers, importResult); + } + + private static int getNumWorkers(int numWorkersFromOptions) { + int ret = (numWorkersFromOptions > 0) ? numWorkersFromOptions : 1; + LOG.info("Migration Import: Setting numWorkers: {}", ret); + return ret; + } + + private void setupEntityStore(AtlasGraphProvider atlasGraphProvider, AtlasTypeRegistry typeRegistry) { + this.entityGraphRetriever = new EntityGraphRetriever(typeRegistry); + this.atlasGraph = atlasGraphProvider.getBulkLoading(); + DeleteHandlerDelegate deleteDelegate = new DeleteHandlerDelegate(typeRegistry); + + IAtlasEntityChangeNotifier entityChangeNotifier = new EntityChangeNotifierNop(); + AtlasRelationshipStore relationshipStore = new AtlasRelationshipStoreV2(typeRegistry, deleteDelegate, entityChangeNotifier); + AtlasFormatConverters formatConverters = new AtlasFormatConverters(typeRegistry); + AtlasInstanceConverter instanceConverter = new AtlasInstanceConverter(typeRegistry, formatConverters); + this.entityGraphMapper = new EntityGraphMapper(deleteDelegate, typeRegistry, atlasGraph, relationshipStore, entityChangeNotifier, instanceConverter, new FullTextMapperV2Nop()); + this.entityStore = new AtlasEntityStoreV2(deleteDelegate, typeRegistry, entityChangeNotifier, entityGraphMapper); + } + + private void shutdownEntityCreationManager(EntityCreationManager creationManager) { + try { + creationManager.shutdown(); + } catch (InterruptedException e) { + LOG.error("Migration Import: Shutdown: Interrupted!", e); + } + } +} diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/RegularImport.java similarity index 80% copy from repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java copy to repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/RegularImport.java index 54c32c5..ecce1b0 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/RegularImport.java @@ -6,16 +6,18 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.atlas.repository.store.graph.v2; + +package org.apache.atlas.repository.store.graph.v2.bulkimport; + import com.google.common.annotations.VisibleForTesting; import org.apache.atlas.AtlasConfiguration; @@ -26,22 +28,23 @@ import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.impexp.AtlasImportResult; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; -import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException; import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.store.graph.AtlasEntityStore; -import org.apache.atlas.repository.store.graph.BulkImporter; +import org.apache.atlas.repository.store.graph.v2.AtlasEntityStreamForImport; +import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; +import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl; +import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; +import org.apache.atlas.repository.store.graph.v2.EntityImportStream; import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Component; -import javax.inject.Inject; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -49,27 +52,25 @@ import java.util.List; import java.util.Set; import static org.apache.atlas.repository.Constants.HISTORICAL_GUID_PROPERTY_KEY; +import static org.apache.atlas.repository.store.graph.v2.BulkImporterImpl.updateImportProgress; -@Component -public class BulkImporterImpl implements BulkImporter { - private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV2.class); - +public class RegularImport extends ImportStrategy { + private static final Logger LOG = LoggerFactory.getLogger(RegularImport.class); + private static final int MAX_ATTEMPTS = 3; private final AtlasEntityStore entityStore; + private final AtlasTypeRegistry typeRegistry; private final EntityGraphRetriever entityGraphRetriever; - private AtlasTypeRegistry typeRegistry; - private final int MAX_ATTEMPTS = 2; private boolean directoryBasedImportConfigured; - @Inject - public BulkImporterImpl(AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) { + public RegularImport(AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) { this.entityStore = entityStore; - this.entityGraphRetriever = new EntityGraphRetriever(typeRegistry); this.typeRegistry = typeRegistry; + this.entityGraphRetriever = new EntityGraphRetriever(typeRegistry); this.directoryBasedImportConfigured = StringUtils.isNotEmpty(AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString()); } @Override - public EntityMutationResponse bulkImport(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException { + public EntityMutationResponse run(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException { if (LOG.isDebugEnabled()) { LOG.debug("==> bulkImport()"); } @@ -81,7 +82,7 @@ public class BulkImporterImpl implements BulkImporter { EntityMutationResponse ret = new EntityMutationResponse(); ret.setGuidAssignments(new HashMap<>()); - Set<String> processedGuids = new HashSet<>(); + Set<String> processedGuids = new HashSet<>(); float currentPercent = 0f; List<String> residualList = new ArrayList<>(); @@ -209,9 +210,9 @@ public class BulkImporterImpl implements BulkImporter { Set<String> processedGuids, int currentIndex, int streamSize, float currentPercent) { if (!directoryBasedImportConfigured) { - updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult); - updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult); - updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult); + BulkImporterImpl.updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult); + BulkImporterImpl.updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult); + BulkImporterImpl.updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult); } String lastEntityImported = String.format("entity:last-imported:%s:[%s]:(%s)", currentEntity.getEntity().getTypeName(), currentIndex, currentEntity.getEntity().getGuid()); @@ -219,38 +220,6 @@ public class BulkImporterImpl implements BulkImporter { return updateImportProgress(LOG, currentIndex, streamSize, currentPercent, lastEntityImported); } - @VisibleForTesting - static float updateImportProgress(Logger log, int currentIndex, int streamSize, float currentPercent, String additionalInfo) { - final double tolerance = 0.000001; - final int MAX_PERCENT = 100; - - int maxSize = (currentIndex <= streamSize) ? streamSize : currentIndex; - float percent = (float) ((currentIndex * MAX_PERCENT) / maxSize); - boolean updateLog = Double.compare(percent, currentPercent) > tolerance; - float updatedPercent = (MAX_PERCENT < maxSize) ? percent : ((updateLog) ? ++currentPercent : currentPercent); - - if (updateLog) { - log.info("bulkImport(): progress: {}% (of {}) - {}", (int) Math.ceil(percent), maxSize, additionalInfo); - } - - return updatedPercent; - } - - private static void updateImportMetrics(String prefix, List<AtlasEntityHeader> list, Set<String> processedGuids, AtlasImportResult importResult) { - if (list == null) { - return; - } - - for (AtlasEntityHeader h : list) { - if (processedGuids.contains(h.getGuid())) { - continue; - } - - processedGuids.add(h.getGuid()); - importResult.incrementMeticsCounter(String.format(prefix, h.getTypeName())); - } - } - private static class EntityImportStreamWithResidualList { private final EntityImportStream stream; private final List<String> residualList; diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java new file mode 100644 index 0000000..e8f4b02 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v2.bulkimport.pc; + +import org.apache.atlas.GraphTransactionInterceptor; +import org.apache.atlas.RequestContext; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.model.instance.EntityMutationResponse; +import org.apache.atlas.pc.WorkItemConsumer; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException; +import org.apache.atlas.repository.store.graph.AtlasEntityStore; +import org.apache.atlas.repository.store.graph.v2.AtlasEntityStreamForImport; +import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl; +import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicLong; + +public class EntityConsumer extends WorkItemConsumer<AtlasEntity.AtlasEntityWithExtInfo> { + private static final Logger LOG = LoggerFactory.getLogger(EntityConsumer.class); + private static final int MAX_COMMIT_RETRY_COUNT = 3; + + private final int batchSize; + private AtomicLong counter = new AtomicLong(1); + private AtomicLong currentBatch = new AtomicLong(1); + + private final AtlasGraph atlasGraph; + private final AtlasEntityStore entityStoreV2; + private final AtlasTypeRegistry typeRegistry; + private final EntityGraphRetriever entityGraphRetriever; + + private List<AtlasEntity.AtlasEntityWithExtInfo> entityBuffer = new ArrayList<>(); + private List<EntityMutationResponse> localResults = new ArrayList<>(); + + public EntityConsumer(AtlasGraph atlasGraph, AtlasEntityStore entityStore, + EntityGraphRetriever entityGraphRetriever, AtlasTypeRegistry typeRegistry, + BlockingQueue queue, int batchSize) { + super(queue); + + this.atlasGraph = atlasGraph; + this.entityStoreV2 = entityStore; + this.entityGraphRetriever = entityGraphRetriever; + this.typeRegistry = typeRegistry; + this.batchSize = batchSize; + } + + @Override + protected void processItem(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) { + int delta = (MapUtils.isEmpty(entityWithExtInfo.getReferredEntities()) + ? 1 + : entityWithExtInfo.getReferredEntities().size()) + 1; + + long currentCount = counter.addAndGet(delta); + currentBatch.addAndGet(delta); + entityBuffer.add(entityWithExtInfo); + + try { + processEntity(entityWithExtInfo, currentCount); + attemptCommit(); + } catch (Exception e) { + LOG.info("Data loss: Please re-submit!", e); + } + } + + private void processEntity(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo, long currentCount) { + try { + RequestContext.get().setImportInProgress(true); + AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, null); + + LOG.debug("Processing: {}", currentCount); + EntityMutationResponse result = entityStoreV2.createOrUpdateForImportNoCommit(oneEntityStream); + localResults.add(result); + } catch (AtlasBaseException e) { + addResult(entityWithExtInfo.getEntity().getGuid()); + LOG.warn("Exception: {}", entityWithExtInfo.getEntity().getGuid(), e); + } catch (AtlasSchemaViolationException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Entity: {}", entityWithExtInfo.getEntity().getGuid(), e); + } + + BulkImporterImpl.updateVertexGuid(typeRegistry, entityGraphRetriever, entityWithExtInfo.getEntity()); + } + } + + private void attemptCommit() { + if (currentBatch.get() < batchSize) { + return; + } + + doCommit(); + } + + @Override + protected void doCommit() { + for (int retryCount = 1; retryCount <= MAX_COMMIT_RETRY_COUNT; retryCount++) { + if (commitWithRetry(retryCount)) { + return; + } + } + + LOG.error("Retries exceeded! Potential data loss! Please correct data and re-attempt. Buffer: {}: Counter: {}", entityBuffer.size(), counter.get()); + clear(); + } + + @Override + protected void commitDirty() { + super.commitDirty(); + LOG.info("Total: Commit: {}", counter.get()); + counter.set(0); + } + + private boolean commitWithRetry(int retryCount) { + try { + atlasGraph.commit(); + if (LOG.isDebugEnabled()) { + LOG.debug("Commit: Done!: Buffer: {}: Batch: {}: Counter: {}", entityBuffer.size(), currentBatch.get(), counter.get()); + } + + dispatchResults(); + return true; + } catch (Exception ex) { + rollbackPauseRetry(retryCount, ex); + return false; + } + } + + private void rollbackPauseRetry(int retryCount, Exception ex) { + atlasGraph.rollback(); + clearCache(); + + LOG.error("Rollback: Done! Buffer: {}: Counter: {}: Retry count: {}", entityBuffer.size(), counter.get(), retryCount); + pause(retryCount); + LOG.warn("Commit error! Will pause and retry: Buffer: {}: Counter: {}: Retry count: {}", entityBuffer.size(), counter.get(), retryCount, ex); + retryProcessEntity(retryCount); + } + + private void retryProcessEntity(int retryCount) { + LOG.info("Replaying: Starting!: Buffer: {}: Retry count: {}", entityBuffer.size(), retryCount); + for (AtlasEntity.AtlasEntityWithExtInfo e : entityBuffer) { + processEntity(e, counter.get()); + } + LOG.info("Replaying: Done!: Buffer: {}: Retry count: {}", entityBuffer.size(), retryCount); + } + + private void dispatchResults() { + localResults.stream().forEach(x -> { + addResultsFromResponse(x.getCreatedEntities()); + addResultsFromResponse(x.getUpdatedEntities()); + addResultsFromResponse(x.getDeletedEntities()); + }); + + clear(); + } + + private void pause(int retryCount) { + try { + Thread.sleep(1000 * retryCount); + } catch (InterruptedException e) { + LOG.error("pause: Interrupted!", e); + } + } + + private void addResultsFromResponse(List<AtlasEntityHeader> entities) { + if (CollectionUtils.isEmpty(entities)) { + return; + } + + for (AtlasEntityHeader eh : entities) { + addResult(eh.getGuid()); + } + } + + private void clear() { + localResults.clear(); + entityBuffer.clear(); + clearCache(); + currentBatch.set(0); + } + + private void clearCache() { + GraphTransactionInterceptor.clearCache(); + RequestContext.get().clearCache(); + } +} diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumerBuilder.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumerBuilder.java new file mode 100644 index 0000000..69d33b2 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumerBuilder.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.store.graph.v2.bulkimport.pc; + +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.pc.WorkItemBuilder; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.store.graph.AtlasEntityStore; +import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; +import org.apache.atlas.type.AtlasTypeRegistry; + +import java.util.concurrent.BlockingQueue; + +public class EntityConsumerBuilder implements WorkItemBuilder<EntityConsumer, AtlasEntity.AtlasEntityWithExtInfo> { + private AtlasGraph atlasGraph; + private AtlasEntityStore entityStore; + private final EntityGraphRetriever entityGraphRetriever; + private final AtlasTypeRegistry typeRegistry; + private int batchSize; + + public EntityConsumerBuilder(AtlasGraph atlasGraph, AtlasEntityStore entityStore, + EntityGraphRetriever entityGraphRetriever, AtlasTypeRegistry typeRegistry, int batchSize) { + this.atlasGraph = atlasGraph; + this.entityStore = entityStore; + this.entityGraphRetriever = entityGraphRetriever; + this.typeRegistry = typeRegistry; + this.batchSize = batchSize; + } + + @Override + public EntityConsumer build(BlockingQueue<AtlasEntity.AtlasEntityWithExtInfo> queue) { + return new EntityConsumer(atlasGraph, entityStore, entityGraphRetriever, typeRegistry, queue, this.batchSize); + } +} diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java new file mode 100644 index 0000000..16bb49e --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.store.graph.v2.bulkimport.pc; + +import org.apache.atlas.model.impexp.AtlasImportResult; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.pc.StatusReporter; +import org.apache.atlas.pc.WorkItemBuilder; +import org.apache.atlas.pc.WorkItemManager; +import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl; +import org.apache.atlas.repository.store.graph.v2.EntityImportStream; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EntityCreationManager<AtlasEntityWithExtInfo> extends WorkItemManager { + private static final Logger LOG = LoggerFactory.getLogger(EntityCreationManager.class); + private static final String WORKER_PREFIX = "migration-import"; + + private final StatusReporter<String, String> statusReporter; + private final AtlasImportResult importResult; + private String currentTypeName; + private float currentPercent; + private EntityImportStream entityImportStream; + + public EntityCreationManager(WorkItemBuilder builder, int batchSize, int numWorkers, AtlasImportResult importResult) { + super(builder, WORKER_PREFIX, batchSize, numWorkers, true); + this.importResult = importResult; + + this.statusReporter = new StatusReporter<>(); + } + + public int read(EntityImportStream entityStream) { + int currentIndex = 0; + AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo; + this.entityImportStream = entityStream; + while ((entityWithExtInfo = entityStream.getNextEntityWithExtInfo()) != null) { + AtlasEntity entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null; + if (entity == null) { + continue; + } + + try { + produce(currentIndex++, entity.getTypeName(), entityWithExtInfo); + } catch (Throwable e) { + LOG.warn("Exception: {}", entity.getGuid(), e); + break; + } + } + return currentIndex; + } + + private void produce(int currentIndex, String typeName, AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) { + String previousTypeName = getCurrentTypeName(); + + if (StringUtils.isNotEmpty(typeName) + && StringUtils.isNotEmpty(previousTypeName) + && !StringUtils.equals(previousTypeName, typeName)) { + LOG.info("Waiting: '{}' to complete...", previousTypeName); + super.drain(); + LOG.info("Switching entity type processing: From: '{}' To: '{}'...", previousTypeName, typeName); + } + + setCurrentTypeName(typeName); + statusReporter.produced(entityWithExtInfo.getEntity().getGuid(), String.format("%s:%s", entityWithExtInfo.getEntity().getTypeName(), currentIndex)); + super.checkProduce(entityWithExtInfo); + extractResults(); + } + + public void extractResults() { + Object result; + while (((result = getResults().poll())) != null) { + statusReporter.processed((String) result); + } + + logStatus(); + } + + private void logStatus() { + String ack = statusReporter.ack(); + if (StringUtils.isEmpty(ack)) { + return; + } + + String[] split = ack.split(":"); + if (split.length == 0 || split.length < 2) { + return; + } + + importResult.incrementMeticsCounter(split[0]); + this.currentPercent = updateImportMetrics(split[0], Integer.parseInt(split[1]), this.entityImportStream.size(), getCurrentPercent()); + } + + private static float updateImportMetrics(String typeNameGuid, int currentIndex, int streamSize, float currentPercent) { + String lastEntityImported = String.format("entity:last-imported:%s:(%s)", typeNameGuid, currentIndex); + return BulkImporterImpl.updateImportProgress(LOG, currentIndex, streamSize, currentPercent, lastEntityImported); + } + + private String getCurrentTypeName() { + return this.currentTypeName; + } + + private void setCurrentTypeName(String typeName) { + this.currentTypeName = typeName; + } + + private float getCurrentPercent() { + return this.currentPercent; + } +} diff --git a/repository/src/test/java/org/apache/atlas/TestModules.java b/repository/src/test/java/org/apache/atlas/TestModules.java index 06e0ebc..a298934 100644 --- a/repository/src/test/java/org/apache/atlas/TestModules.java +++ b/repository/src/test/java/org/apache/atlas/TestModules.java @@ -36,7 +36,9 @@ import org.apache.atlas.listener.TypeDefChangeListener; import org.apache.atlas.repository.audit.EntityAuditListener; import org.apache.atlas.repository.audit.EntityAuditListenerV2; import org.apache.atlas.repository.audit.EntityAuditRepository; +import org.apache.atlas.repository.graph.FullTextMapperV2; import org.apache.atlas.repository.graph.GraphBackedSearchIndexer; +import org.apache.atlas.repository.graph.IFullTextMapper; import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.GraphDBMigrator; import org.apache.atlas.repository.graphdb.janus.migration.GraphDBGraphSONMigrator; @@ -61,6 +63,7 @@ import org.apache.atlas.repository.store.graph.v2.AtlasRelationshipStoreV2; import org.apache.atlas.repository.store.graph.v2.AtlasTypeDefGraphStoreV2; import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl; import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper; +import org.apache.atlas.repository.store.graph.v2.IAtlasEntityChangeNotifier; import org.apache.atlas.runner.LocalSolrRunner; import org.apache.atlas.service.Service; import org.apache.atlas.store.AtlasTypeDefStore; @@ -144,6 +147,8 @@ public class TestModules { bind(AtlasEntityStore.class).to(AtlasEntityStoreV2.class); bind(AtlasRelationshipStore.class).to(AtlasRelationshipStoreV2.class); + bind(IAtlasEntityChangeNotifier.class).to(AtlasEntityChangeNotifier.class); + bind(IFullTextMapper.class).to(FullTextMapperV2.class); // bind the DiscoveryService interface to an implementation bind(AtlasDiscoveryService.class).to(EntityDiscoveryService.class).asEagerSingleton();
