This is an automated email from the ASF dual-hosted git repository. amestry pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/atlas.git
commit 31125c58c8a6d2edf0bd7de4e31c44e18adc9a7f Author: Ashutosh Mestry <[email protected]> AuthorDate: Wed Dec 9 21:52:59 2020 -0800 ATLAS-4024: Export Service: Export of terms. Signed-off-by: Ashutosh Mestry <[email protected]> --- .../atlas/repository/impexp/ExportService.java | 43 ++++++++++++++++++++-- .../repository/impexp/ExportTypeProcessor.java | 40 +++++++++++++++++++- 2 files changed, 79 insertions(+), 4 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java index 8af2057..65d7a18 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java @@ -20,6 +20,7 @@ package org.apache.atlas.repository.impexp; import com.google.common.annotations.VisibleForTesting; import org.apache.atlas.RequestContext; import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.glossary.GlossaryService; import org.apache.atlas.model.impexp.AtlasExportRequest; import org.apache.atlas.model.impexp.AtlasExportResult; import org.apache.atlas.model.instance.AtlasEntity; @@ -38,6 +39,7 @@ import org.apache.atlas.repository.util.UniqueList; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.util.AtlasGremlinQueryProvider; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @@ -65,14 +67,17 @@ public class ExportService { private final EntityGraphRetriever entityGraphRetriever; private ExportTypeProcessor exportTypeProcessor; private final HdfsPathEntityCreator hdfsPathEntityCreator; + private final GlossaryService glossaryService; @Inject public ExportService(final AtlasTypeRegistry typeRegistry, AtlasGraph graph, - AuditsWriter auditsWriter, HdfsPathEntityCreator hdfsPathEntityCreator) { + AuditsWriter auditsWriter, HdfsPathEntityCreator hdfsPathEntityCreator, + GlossaryService glossaryService) { this.typeRegistry = typeRegistry; this.entityGraphRetriever = new EntityGraphRetriever(graph, this.typeRegistry); this.auditsWriter = auditsWriter; this.hdfsPathEntityCreator = hdfsPathEntityCreator; + this.glossaryService = glossaryService; this.startEntityFetchByExportRequest = new StartEntityFetchByExportRequest(graph, typeRegistry, AtlasGremlinQueryProvider.INSTANCE); this.entitiesExtractor = new EntitiesExtractor(graph, typeRegistry); } @@ -84,7 +89,7 @@ public class ExportService { hostName, startTime, getCurrentChangeMarker()); ExportContext context = new ExportContext(result, exportSink); - exportTypeProcessor = new ExportTypeProcessor(typeRegistry); + exportTypeProcessor = new ExportTypeProcessor(typeRegistry, glossaryService); try { LOG.info("==> export(user={}, from={})", userName, requestingIP); @@ -264,8 +269,12 @@ public class ExportService { } public void processEntity(AtlasEntityWithExtInfo entityWithExtInfo, ExportContext context) throws AtlasBaseException { - addEntity(entityWithExtInfo, context); exportTypeProcessor.addTypes(entityWithExtInfo.getEntity(), context); + if (MapUtils.isNotEmpty(context.termsGlossary)) { + addGlossaryEntities(context); + } + + addEntity(entityWithExtInfo, context); context.guidsProcessed.add(entityWithExtInfo.getEntity().getGuid()); entitiesExtractor.get(entityWithExtInfo.getEntity(), context); @@ -280,6 +289,28 @@ public class ExportService { } } + private void addGlossaryEntities(ExportContext context) { + try { + for (String termGuid : context.termsGlossary.keySet()) { + try { + String glossaryGuid = context.termsGlossary.get(termGuid); + if (!context.sink.hasEntity(glossaryGuid)) { + AtlasEntity glossary = entityGraphRetriever.toAtlasEntity(glossaryGuid); + addEntity(new AtlasEntityWithExtInfo(glossary), context); + } + + if (!context.sink.hasEntity(termGuid)) { + AtlasEntity term = entityGraphRetriever.toAtlasEntity(termGuid); + addEntity(new AtlasEntityWithExtInfo(term), context); + } + } catch (AtlasBaseException exception) { + LOG.error("Error fetching Glossary for term: {}", termGuid); + } + } + } finally { + context.clearTerms(); + } + } private void addEntity(AtlasEntityWithExtInfo entityWithExtInfo, ExportContext context) throws AtlasBaseException { if(context.sink.hasEntity(entityWithExtInfo.getEntity().getGuid())) { @@ -355,6 +386,8 @@ public class ExportService { final Set<String> enumTypes = new HashSet<>(); final Set<String> relationshipTypes = new HashSet<>(); final Set<String> businessMetadataTypes = new HashSet<>(); + final Map<String, String> termsGlossary = new HashMap<>(); + final AtlasExportResult result; private final ZipSink sink; @@ -471,5 +504,9 @@ public class ExportService { public void addToEntityCreationOrder(String guid) { entityCreationOrder.add(guid); } + + public void clearTerms() { + termsGlossary.clear(); + } } } diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportTypeProcessor.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportTypeProcessor.java index a85db5c..186b4b0 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportTypeProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportTypeProcessor.java @@ -19,9 +19,13 @@ package org.apache.atlas.repository.impexp; import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.glossary.GlossaryService; import org.apache.atlas.model.TypeCategory; +import org.apache.atlas.model.glossary.AtlasGlossaryTerm; import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.model.instance.AtlasRelatedObjectId; import org.apache.atlas.model.typedef.AtlasStructDef; import org.apache.atlas.type.AtlasArrayType; import org.apache.atlas.type.AtlasBusinessMetadataType; @@ -37,15 +41,21 @@ import org.apache.commons.collections.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Map; class ExportTypeProcessor { private static final Logger LOG = LoggerFactory.getLogger(ExportTypeProcessor.class); + private static final String RELATIONSHIP_ATTR_MEANINGS = "meanings"; private AtlasTypeRegistry typeRegistry; + private GlossaryService glossaryService; - ExportTypeProcessor(AtlasTypeRegistry typeRegistry) { + ExportTypeProcessor(AtlasTypeRegistry typeRegistry, GlossaryService glossaryService) { this.typeRegistry = typeRegistry; + this.glossaryService = glossaryService; } public void addTypes(AtlasEntity entity, ExportService.ExportContext context) { @@ -56,6 +66,34 @@ class ExportTypeProcessor { addClassificationType(c.getTypeName(), context); } } + + addTerms(entity, context); + } + + private void addTerms(AtlasEntity entity, ExportService.ExportContext context) { + Object relAttrMeanings = entity.getRelationshipAttribute(RELATIONSHIP_ATTR_MEANINGS); + if (relAttrMeanings == null || !(relAttrMeanings instanceof List)) { + return; + } + + List list = (List) relAttrMeanings; + if (CollectionUtils.isEmpty(list)) { + return; + } + + for (Object objectId : list) { + if (objectId instanceof AtlasRelatedObjectId) { + AtlasRelatedObjectId termObjectId = (AtlasRelatedObjectId) objectId; + + try { + AtlasGlossaryTerm term = glossaryService.getTerm(termObjectId.getGuid()); + context.termsGlossary.put(termObjectId.getGuid(), term.getAnchor().getGlossaryGuid()); + } + catch (AtlasBaseException e) { + LOG.warn("Error fetching term details: {}", termObjectId); + } + } + } } private void addType(String typeName, ExportService.ExportContext context) {
