This is an automated email from the ASF dual-hosted git repository. pinal pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 8810c44 ATLAS-4473: GlossaryTerms Bulk Create Performance Improvement 8810c44 is described below commit 8810c4477037dafbb518df57ff823c70127052d6 Author: Mandar Ambawane <mandar.ambaw...@freestoneinfotech.com> AuthorDate: Tue Nov 16 09:44:39 2021 +0530 ATLAS-4473: GlossaryTerms Bulk Create Performance Improvement Signed-off-by: Pinal Shah <pinal.s...@freestoneinfotech.com> --- .../org/apache/atlas/glossary/GlossaryService.java | 186 +++++++++++++++------ .../apache/atlas/glossary/GlossaryTermUtils.java | 44 +++-- 2 files changed, 161 insertions(+), 69 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/glossary/GlossaryService.java b/repository/src/main/java/org/apache/atlas/glossary/GlossaryService.java index 9c84598..f81b538 100644 --- a/repository/src/main/java/org/apache/atlas/glossary/GlossaryService.java +++ b/repository/src/main/java/org/apache/atlas/glossary/GlossaryService.java @@ -18,6 +18,7 @@ package org.apache.atlas.glossary; import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.RequestContext; import org.apache.atlas.SortOrder; import org.apache.atlas.annotation.GraphTransaction; import org.apache.atlas.bulkimport.BulkImportResponse; @@ -30,6 +31,8 @@ import org.apache.atlas.model.glossary.relations.AtlasRelatedCategoryHeader; import org.apache.atlas.model.glossary.relations.AtlasRelatedTermHeader; import org.apache.atlas.model.glossary.relations.AtlasTermCategorizationHeader; import org.apache.atlas.model.instance.AtlasRelatedObjectId; +import org.apache.atlas.repository.graphdb.AtlasEdge; +import org.apache.atlas.repository.graphdb.AtlasEdgeDirection; import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.ogm.DataAccess; import org.apache.atlas.repository.store.graph.AtlasRelationshipStore; @@ -38,6 +41,7 @@ import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.util.FileUtils; import org.apache.atlas.utils.AtlasJson; +import org.apache.atlas.utils.AtlasPerfMetrics; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; @@ -50,6 +54,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -64,9 +69,16 @@ import static org.apache.atlas.glossary.GlossaryUtils.getGlossarySkeleton; @Service public class GlossaryService { - private static final Logger LOG = LoggerFactory.getLogger(GlossaryService.class); - private static final boolean DEBUG_ENABLED = LOG.isDebugEnabled(); - private static final String QUALIFIED_NAME_ATTR = "qualifiedName"; + private static final Logger LOG = LoggerFactory.getLogger(GlossaryService.class); + private static final boolean DEBUG_ENABLED = LOG.isDebugEnabled(); + private static final String ATLAS_GLOSSARY_TERM = "AtlasGlossaryTerm"; + private static final String NAME_ATTR = "name"; + private static final String QUALIFIED_NAME_ATTR = "qualifiedName"; + private static final String GLOSSARY_QUALIFIED_NAME_PROPERTY = "AtlasGlossary." + QUALIFIED_NAME_ATTR; + private static final String GLOSSARY_CATEGORY_NAME_PROPERTY = "AtlasGlossaryCategory.name"; + private static final String GLOSSARY_TERM_NAME_PROPERTY = ATLAS_GLOSSARY_TERM + "." + NAME_ATTR; + private static final String TERM_UNIQUE_QUALIFIED_NAME_PROPERTY = ATLAS_GLOSSARY_TERM + ".__u_" + QUALIFIED_NAME_ATTR; + private static final String GLOSSARY_TERM_ANCHOR_EDGE_LABEL = "r:AtlasGlossaryTermAnchor"; private final DataAccess dataAccess; private final GlossaryTermUtils glossaryTermUtils; @@ -76,6 +88,9 @@ public class GlossaryService { private static final char[] invalidNameChars = { '@', '.' }; + private static final Map<String, String> glossaryGuidQualifiedNameCache = new HashMap<>(); + private static final Map<String, String> categoryGuidNameCache = new HashMap<>(); + @Inject public GlossaryService(DataAccess dataAccess, final AtlasRelationshipStore relationshipStore, final AtlasTypeRegistry typeRegistry, AtlasEntityChangeNotifier entityChangeNotifier) { @@ -327,65 +342,77 @@ public class GlossaryService { } @GraphTransaction - public AtlasGlossaryTerm createTerm(AtlasGlossaryTerm glossaryTerm) throws AtlasBaseException { + public AtlasGlossaryTerm createTerm(AtlasGlossaryTerm term) throws AtlasBaseException { if (DEBUG_ENABLED) { - LOG.debug("==> GlossaryService.create({})", glossaryTerm); + LOG.debug("==> GlossaryService.create({})", term); } - if (Objects.isNull(glossaryTerm)) { + + if (Objects.isNull(term)) { throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "GlossaryTerm definition missing"); } - if (Objects.isNull(glossaryTerm.getAnchor())) { + + if (Objects.isNull(term.getAnchor())) { throw new AtlasBaseException(AtlasErrorCode.MISSING_MANDATORY_ANCHOR); } - if (StringUtils.isEmpty(glossaryTerm.getName())) { + + if (StringUtils.isEmpty(term.getName())) { throw new AtlasBaseException(AtlasErrorCode.GLOSSARY_TERM_QUALIFIED_NAME_CANT_BE_DERIVED); } - if (isNameInvalid(glossaryTerm.getName())){ + if (isNameInvalid(term.getName())){ throw new AtlasBaseException(AtlasErrorCode.INVALID_DISPLAY_NAME); } else { // Derive the qualifiedName - String anchorGlossaryGuid = glossaryTerm.getAnchor().getGlossaryGuid(); - AtlasGlossary glossary = dataAccess.load(getGlossarySkeleton(anchorGlossaryGuid)); - glossaryTerm.setQualifiedName(glossaryTerm.getName() + "@" + glossary.getQualifiedName()); + String anchorGlossaryGuid = term.getAnchor().getGlossaryGuid(); + String glossaryQualifiedName = getGlossaryQualifiedName(anchorGlossaryGuid); + + if (StringUtils.isEmpty(glossaryQualifiedName)) { + throw new AtlasBaseException(AtlasErrorCode.GLOSSARY_QUALIFIED_NAME_CANT_BE_DERIVED); + } + + term.setQualifiedName(term.getName() + "@" + glossaryQualifiedName); if (LOG.isDebugEnabled()) { - LOG.debug("Derived qualifiedName = {}", glossaryTerm.getQualifiedName()); + LOG.debug("Derived qualifiedName = {}", term.getQualifiedName()); } } // This might fail for the case when the term's qualifiedName has been updated and the duplicate request comes in with old name - if (termExists(glossaryTerm)) { - throw new AtlasBaseException(AtlasErrorCode.GLOSSARY_TERM_ALREADY_EXISTS, glossaryTerm.getQualifiedName()); + if (termExists2(term)) { + throw new AtlasBaseException(AtlasErrorCode.GLOSSARY_TERM_ALREADY_EXISTS, term.getQualifiedName()); } - AtlasGlossaryTerm storeObject = dataAccess.save(glossaryTerm); - glossaryTermUtils.processTermRelations(storeObject, glossaryTerm, GlossaryUtils.RelationshipOperation.CREATE); + AtlasGlossaryTerm storeGlossaryTerm = dataAccess.save(term); + + glossaryTermUtils.processTermRelations(storeGlossaryTerm, term, GlossaryUtils.RelationshipOperation.CREATE); // Re-load term after handling relations - storeObject = dataAccess.load(glossaryTerm); - setInfoForRelations(storeObject); + storeGlossaryTerm = dataAccess.load(term); + setInfoForRelations(storeGlossaryTerm); if (DEBUG_ENABLED) { - LOG.debug("<== GlossaryService.create() : {}", storeObject); + LOG.debug("<== GlossaryService.create() : {}", storeGlossaryTerm); } - return storeObject; + + return storeGlossaryTerm; } @GraphTransaction - public List<AtlasGlossaryTerm> createTerms(List<AtlasGlossaryTerm> glossaryTerm) throws AtlasBaseException { + public List<AtlasGlossaryTerm> createTerms(List<AtlasGlossaryTerm> glossaryTerms) throws AtlasBaseException { if (DEBUG_ENABLED) { - LOG.debug("==> GlossaryService.create({})", glossaryTerm); + LOG.debug("==> GlossaryService.create({})", glossaryTerms); } - if (Objects.isNull(glossaryTerm)) { + if (Objects.isNull(glossaryTerms)) { throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "glossaryTerm(s) is null/empty"); } List<AtlasGlossaryTerm> ret = new ArrayList<>(); - for (AtlasGlossaryTerm atlasGlossaryTerm : glossaryTerm) { - ret.add(createTerm(atlasGlossaryTerm)); + for (AtlasGlossaryTerm glossaryTerm : glossaryTerms) { + AtlasGlossaryTerm term = createTerm(glossaryTerm); + + ret.add(term); } if (LOG.isDebugEnabled()) { @@ -939,13 +966,64 @@ public class GlossaryService { return Objects.nonNull(vertex); } + private String getGlossaryQualifiedName(String glossaryGuid) { + String ret = glossaryGuidQualifiedNameCache.get(glossaryGuid); + + if (StringUtils.isEmpty(ret)) { + AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(glossaryGuid); + + if (vertex != null) { + ret = vertex.getProperty(GLOSSARY_QUALIFIED_NAME_PROPERTY, String.class); + + glossaryGuidQualifiedNameCache.put(glossaryGuid, ret); + } + } + + return ret; + } + + private String getGlossaryCategoryName(String glossaryCategoryGuid) { + String ret = categoryGuidNameCache.get(glossaryCategoryGuid); + + if (StringUtils.isEmpty(ret)) { + AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(glossaryCategoryGuid); + + if (vertex != null) { + ret = vertex.getProperty(GLOSSARY_CATEGORY_NAME_PROPERTY, String.class); + + categoryGuidNameCache.put(glossaryCategoryGuid, ret); + } + } + + return ret; + } + private boolean termExists(AtlasGlossaryTerm term) { AtlasVertex vertex = AtlasGraphUtilsV2.findByUniqueAttributes(atlasTypeRegistry.getEntityTypeByName(GlossaryUtils.ATLAS_GLOSSARY_TERM_TYPENAME), new HashMap<String, Object>() {{ put(QUALIFIED_NAME_ATTR, term.getQualifiedName()); }}); + return Objects.nonNull(vertex); } + private boolean termExists2(AtlasGlossaryTerm term) { + boolean ret = false; + AtlasVertex glossaryVertex = AtlasGraphUtilsV2.findByGuid(term.getAnchor().getGlossaryGuid()); + Iterable<AtlasEdge> glossaryTermEdges = glossaryVertex.getEdges(AtlasEdgeDirection.OUT, GLOSSARY_TERM_ANCHOR_EDGE_LABEL); + + for (Iterator<AtlasEdge> iter = glossaryTermEdges.iterator(); iter.hasNext(); ) { + AtlasVertex termVertex = iter.next().getInVertex(); + String termQualifiedName = termVertex.getProperty(TERM_UNIQUE_QUALIFIED_NAME_PROPERTY, String.class); + + if (StringUtils.equals(termQualifiedName, term.getQualifiedName())) { + ret = true; + break; + } + } + + return ret; + } + private boolean categoryExists(AtlasGlossaryCategory category) { AtlasVertex vertex = AtlasGraphUtilsV2.findByUniqueAttributes(atlasTypeRegistry.getEntityTypeByName(GlossaryUtils.ATLAS_GLOSSARY_CATEGORY_TYPENAME), new HashMap<String, Object>() {{ put(QUALIFIED_NAME_ATTR, category.getQualifiedName()); @@ -979,7 +1057,7 @@ public class GlossaryService { private void setInfoForRelations(final AtlasGlossary ret) throws AtlasBaseException { if (Objects.nonNull(ret.getTerms())) { - setInfoForTerms(ret.getTerms()); + setDisplayNameForTerms(ret.getTerms()); } if (Objects.nonNull(ret.getCategories())) { @@ -987,13 +1065,12 @@ public class GlossaryService { } } - private void setInfoForRelations(final AtlasGlossaryTerm ret) throws AtlasBaseException { - if (Objects.nonNull(ret.getCategories())) { - setDisplayNameForTermCategories(ret.getCategories()); - } - if (Objects.nonNull(ret.getRelatedTerms())) { - for (Map.Entry<AtlasGlossaryTerm.Relation, Set<AtlasRelatedTermHeader>> entry : ret.getRelatedTerms().entrySet()) { - setInfoForTerms(entry.getValue()); + private void setInfoForRelations(final AtlasGlossaryTerm glossaryTerm) throws AtlasBaseException { + setDisplayNameForCategories(glossaryTerm.getCategories()); + + if (Objects.nonNull(glossaryTerm.getRelatedTerms())) { + for (Map.Entry<AtlasGlossaryTerm.Relation, Set<AtlasRelatedTermHeader>> entry : glossaryTerm.getRelatedTerms().entrySet()) { + setDisplayNameForTerms(entry.getValue()); } } } @@ -1003,18 +1080,23 @@ public class GlossaryService { setInfoForRelatedCategories(glossaryCategory.getChildrenCategories()); } if (Objects.nonNull(glossaryCategory.getTerms())) { - setInfoForTerms(glossaryCategory.getTerms()); + setDisplayNameForTerms(glossaryCategory.getTerms()); } } - private void setDisplayNameForTermCategories(final Set<AtlasTermCategorizationHeader> categorizationHeaders) throws AtlasBaseException { - List<AtlasGlossaryCategory> categories = categorizationHeaders - .stream() - .map(id -> getAtlasGlossaryCategorySkeleton(id.getCategoryGuid())) - .collect(Collectors.toList()); - Map<String, AtlasGlossaryCategory> categoryMap = new HashMap<>(); - dataAccess.load(categories).forEach(c -> categoryMap.put(c.getGuid(), c)); - categorizationHeaders.forEach(c -> c.setDisplayText(categoryMap.get(c.getCategoryGuid()).getName())); + private void setDisplayNameForCategories(final Set<AtlasTermCategorizationHeader> categorizationHeaders) throws AtlasBaseException { + if (CollectionUtils.isEmpty(categorizationHeaders)) { + return; + } + + for (AtlasTermCategorizationHeader termCategorizationHeader : categorizationHeaders) { + String categoryGuid = termCategorizationHeader.getCategoryGuid(); + String categoryName = getGlossaryCategoryName(categoryGuid); + + if (StringUtils.isNotEmpty(categoryName)) { + termCategorizationHeader.setDisplayText(categoryName); + } + } } private void setInfoForRelatedCategories(final Collection<AtlasRelatedCategoryHeader> categoryHeaders) throws AtlasBaseException { @@ -1033,15 +1115,17 @@ public class GlossaryService { } } - private void setInfoForTerms(final Collection<AtlasRelatedTermHeader> termHeaders) throws AtlasBaseException { - List<AtlasGlossaryTerm> terms = termHeaders - .stream() - .map(id -> getAtlasGlossaryTermSkeleton(id.getTermGuid())) - .collect(Collectors.toList()); - Map<String, AtlasGlossaryTerm> termMap = new HashMap<>(); - dataAccess.load(terms).iterator().forEachRemaining(t -> termMap.put(t.getGuid(), t)); + private void setDisplayNameForTerms(final Collection<AtlasRelatedTermHeader> termHeaders) throws AtlasBaseException { - termHeaders.forEach(t -> t.setDisplayText(getDisplayText(termMap.get(t.getTermGuid())))); + for (AtlasRelatedTermHeader termHeader : termHeaders) { + String termGuid = termHeader.getTermGuid(); + AtlasVertex termVertex = AtlasGraphUtilsV2.findByGuid(termGuid); + String termDisplayText = termVertex.getProperty(GLOSSARY_TERM_NAME_PROPERTY, String.class); + + if (StringUtils.isNotEmpty(termDisplayText)) { + termHeader.setDisplayText(termDisplayText); + } + } } public static boolean isNameInvalid(String name) { diff --git a/repository/src/main/java/org/apache/atlas/glossary/GlossaryTermUtils.java b/repository/src/main/java/org/apache/atlas/glossary/GlossaryTermUtils.java index d92daee..8a161e4 100644 --- a/repository/src/main/java/org/apache/atlas/glossary/GlossaryTermUtils.java +++ b/repository/src/main/java/org/apache/atlas/glossary/GlossaryTermUtils.java @@ -18,6 +18,7 @@ package org.apache.atlas.glossary; import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.RequestContext; import org.apache.atlas.bulkimport.BulkImportResponse; import org.apache.atlas.bulkimport.BulkImportResponse.ImportInfo; import org.apache.atlas.exception.AtlasBaseException; @@ -37,6 +38,7 @@ import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; import org.apache.atlas.type.AtlasRelationshipType; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.util.FileUtils; +import org.apache.atlas.utils.AtlasPerfMetrics; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang.ArrayUtils; @@ -161,58 +163,62 @@ public class GlossaryTermUtils extends GlossaryUtils { return StringUtils.equals(relatedObjectId.getRelationshipGuid(), storeObject.getRelationshipGuid()); } - private void processTermAnchor(AtlasGlossaryTerm storeObject, AtlasGlossaryTerm updatedTerm, RelationshipOperation op) throws AtlasBaseException { + private void processTermAnchor(AtlasGlossaryTerm currentTerm, AtlasGlossaryTerm updatedTerm, RelationshipOperation op) throws AtlasBaseException { if (Objects.isNull(updatedTerm.getAnchor()) && op != RelationshipOperation.DELETE) { throw new AtlasBaseException(AtlasErrorCode.MISSING_MANDATORY_ANCHOR); } - AtlasGlossaryHeader existingAnchor = storeObject.getAnchor(); - AtlasGlossaryHeader updatedTermAnchor = updatedTerm.getAnchor(); + AtlasGlossaryHeader currentTermGlossary = currentTerm.getAnchor(); // glossary_g1 + AtlasGlossaryHeader updatedTermGlossary = updatedTerm.getAnchor(); // glossary_g2 + String updatedTermGlossaryGuid = updatedTermGlossary.getGlossaryGuid(); + String currentTermGlossaryGuid = currentTermGlossary.getGlossaryGuid(); switch (op) { case CREATE: - if (Objects.isNull(updatedTermAnchor.getGlossaryGuid())) { + if (Objects.isNull(updatedTermGlossaryGuid)) { throw new AtlasBaseException(AtlasErrorCode.INVALID_NEW_ANCHOR_GUID); } else { if (DEBUG_ENABLED) { - LOG.debug("Creating new term anchor, category = {}, glossary = {}", storeObject.getGuid(), updatedTerm.getAnchor().getGlossaryGuid()); + LOG.debug("Creating new term anchor, category = {}, glossary = {}", currentTerm.getGuid(), updatedTerm.getAnchor().getGlossaryGuid()); } - createRelationship(defineTermAnchorRelation(updatedTermAnchor.getGlossaryGuid(), storeObject.getGuid())); + if (!StringUtils.equals(updatedTermGlossaryGuid, currentTermGlossaryGuid)) { + createRelationship(defineTermAnchorRelation(updatedTermGlossaryGuid, currentTerm.getGuid())); + } } break; case UPDATE: - if (!Objects.equals(updatedTermAnchor, existingAnchor)) { - if (Objects.isNull(updatedTermAnchor.getGlossaryGuid())) { + if (!Objects.equals(updatedTermGlossary, currentTermGlossary)) { + if (Objects.isNull(updatedTermGlossaryGuid)) { throw new AtlasBaseException(AtlasErrorCode.INVALID_NEW_ANCHOR_GUID); } if (DEBUG_ENABLED) { LOG.debug("Updating term anchor, currAnchor = {}, newAnchor = {} and term = {}", - existingAnchor.getGlossaryGuid(), - updatedTermAnchor.getGlossaryGuid(), - storeObject.getName()); + currentTermGlossaryGuid, + updatedTermGlossaryGuid, + currentTerm.getName()); } - relationshipStore.deleteById(existingAnchor.getRelationGuid(), true); + relationshipStore.deleteById(currentTermGlossary.getRelationGuid(), true); // Derive the qualifiedName when anchor changes - String anchorGlossaryGuid = updatedTermAnchor.getGlossaryGuid(); + String anchorGlossaryGuid = updatedTermGlossaryGuid; AtlasGlossary glossary = dataAccess.load(getGlossarySkeleton(anchorGlossaryGuid)); - storeObject.setQualifiedName(storeObject.getName() + "@" + glossary.getQualifiedName()); + currentTerm.setQualifiedName(currentTerm.getName() + "@" + glossary.getQualifiedName()); if (LOG.isDebugEnabled()) { - LOG.debug("Derived qualifiedName = {}", storeObject.getQualifiedName()); + LOG.debug("Derived qualifiedName = {}", currentTerm.getQualifiedName()); } - createRelationship(defineTermAnchorRelation(updatedTermAnchor.getGlossaryGuid(), storeObject.getGuid())); + createRelationship(defineTermAnchorRelation(updatedTermGlossaryGuid, currentTerm.getGuid())); } break; case DELETE: - if (Objects.nonNull(existingAnchor)) { + if (Objects.nonNull(currentTermGlossary)) { if (DEBUG_ENABLED) { LOG.debug("Deleting term anchor"); } - relationshipStore.deleteById(existingAnchor.getRelationGuid(), true); + relationshipStore.deleteById(currentTermGlossary.getRelationGuid(), true); } break; } @@ -221,6 +227,7 @@ public class GlossaryTermUtils extends GlossaryUtils { private void processRelatedTerms(AtlasGlossaryTerm storeObject, AtlasGlossaryTerm updatedTerm, RelationshipOperation op) throws AtlasBaseException { Map<AtlasGlossaryTerm.Relation, Set<AtlasRelatedTermHeader>> newRelatedTerms = updatedTerm.getRelatedTerms(); Map<AtlasGlossaryTerm.Relation, Set<AtlasRelatedTermHeader>> existingRelatedTerms = storeObject.getRelatedTerms(); + switch (op) { case CREATE: for (Map.Entry<AtlasGlossaryTerm.Relation, Set<AtlasRelatedTermHeader>> entry : newRelatedTerms.entrySet()) { @@ -313,6 +320,7 @@ public class GlossaryTermUtils extends GlossaryUtils { private void processAssociatedCategories(AtlasGlossaryTerm storeObject, AtlasGlossaryTerm updatedTerm, RelationshipOperation op) throws AtlasBaseException { Map<String, AtlasTermCategorizationHeader> newCategories = getAssociatedCategories(updatedTerm); Map<String, AtlasTermCategorizationHeader> existingCategories = getAssociatedCategories(storeObject); + switch (op) { case CREATE: if (Objects.nonNull(newCategories)) {