rkundam commented on code in PR #399:
URL: https://github.com/apache/atlas/pull/399#discussion_r3479244570


##########
repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java:
##########
@@ -413,6 +436,91 @@ private void addEntity(AtlasEntityWithExtInfo 
entityWithExtInfo, ExportContext c
         context.reportProgress();
     }
 
+    public void addEntityGuids(String guid, ExportContext context, 
RelationshipAttributesExtractor relationshipAttributesExtractor) throws 
AtlasBaseException {
+        AtlasVertex adjacentVertex;
+        Iterator<AtlasEdge> entityEdges;
+        Iterator<AtlasVertex> propagateClassificationVertices;
+        Iterator<AtlasVertex> appliedClassificationVertices;
+        String fetchedClassificationGuid;
+        List<AtlasClassification> processedClassifications = new ArrayList<>();
+
+        AtlasVertex initialEntityVertex = 
entityGraphRetriever.getEntityVertex(guid);
+        for (AtlasClassification currentClassification : 
entityGraphRetriever.getAllClassifications(initialEntityVertex)) {
+            if 
(context.guidsProcessed.contains(currentClassification.getEntityGuid())) {
+                processedClassifications.add(currentClassification);
+            }
+        }
+        context.newAddedGuids.add(guid);
+        while (!context.newAddedGuids.isEmpty()) {
+            String currentGuid = context.newAddedGuids.poll();
+
+            AtlasVertex entityVertex = 
entityGraphRetriever.getEntityVertex(currentGuid);
+            String entityTypeName = getTypeName(entityVertex);
+            List<AtlasClassification> classifications = 
entityGraphRetriever.getAllClassifications(entityVertex);
+            if (CollectionUtils.isNotEmpty(processedClassifications)) {
+                classifications.removeAll(processedClassifications);
+            }
+            if (CollectionUtils.isNotEmpty(classifications)) {
+                for (AtlasClassification classification : classifications) {
+                    String classificationName = classification.getTypeName();
+                    boolean isProcess = 
relationshipAttributesExtractor.isLineageType(entityTypeName);
+                    entityEdges = isProcess
+                            ? GraphHelper.getEdgesForLabel(entityVertex, 
PROCESS_INPUTS, OUT)
+                            : GraphHelper.getEdgesForLabel(entityVertex, 
PROCESS_OUTPUTS, IN);
+                    while (entityEdges.hasNext()) {
+                        AtlasEdge propagationEdge = entityEdges.next();
+                        AtlasVertex outVertex = propagationEdge.getOutVertex();
+                        AtlasVertex inVertex = propagationEdge.getInVertex();
+                        adjacentVertex = 
StringUtils.equals(outVertex.getIdForDisplay(), entityVertex.getIdForDisplay()) 
? inVertex : outVertex;
+                        String adjacentGuid = getGuid(adjacentVertex);
+                        boolean isPropagated = false;
+                        propagateClassificationVertices = 
getClassificationVertices(inVertex, outVertex, isProcess, true, 
classificationName);

Review Comment:
   The code in lines 477–485 is redundant with the code in lines 487–496. 
Consider extracting the shared logic into a single helper method.



##########
repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java:
##########
@@ -413,6 +436,91 @@ private void addEntity(AtlasEntityWithExtInfo 
entityWithExtInfo, ExportContext c
         context.reportProgress();
     }
 
+    public void addEntityGuids(String guid, ExportContext context, 
RelationshipAttributesExtractor relationshipAttributesExtractor) throws 
AtlasBaseException {
+        AtlasVertex adjacentVertex;
+        Iterator<AtlasEdge> entityEdges;
+        Iterator<AtlasVertex> propagateClassificationVertices;
+        Iterator<AtlasVertex> appliedClassificationVertices;
+        String fetchedClassificationGuid;
+        List<AtlasClassification> processedClassifications = new ArrayList<>();
+
+        AtlasVertex initialEntityVertex = 
entityGraphRetriever.getEntityVertex(guid);
+        for (AtlasClassification currentClassification : 
entityGraphRetriever.getAllClassifications(initialEntityVertex)) {
+            if 
(context.guidsProcessed.contains(currentClassification.getEntityGuid())) {
+                processedClassifications.add(currentClassification);
+            }
+        }
+        context.newAddedGuids.add(guid);
+        while (!context.newAddedGuids.isEmpty()) {
+            String currentGuid = context.newAddedGuids.poll();
+
+            AtlasVertex entityVertex = 
entityGraphRetriever.getEntityVertex(currentGuid);
+            String entityTypeName = getTypeName(entityVertex);
+            List<AtlasClassification> classifications = 
entityGraphRetriever.getAllClassifications(entityVertex);
+            if (CollectionUtils.isNotEmpty(processedClassifications)) {
+                classifications.removeAll(processedClassifications);
+            }
+            if (CollectionUtils.isNotEmpty(classifications)) {
+                for (AtlasClassification classification : classifications) {
+                    String classificationName = classification.getTypeName();
+                    boolean isProcess = 
relationshipAttributesExtractor.isLineageType(entityTypeName);
+                    entityEdges = isProcess
+                            ? GraphHelper.getEdgesForLabel(entityVertex, 
PROCESS_INPUTS, OUT)
+                            : GraphHelper.getEdgesForLabel(entityVertex, 
PROCESS_OUTPUTS, IN);
+                    while (entityEdges.hasNext()) {
+                        AtlasEdge propagationEdge = entityEdges.next();
+                        AtlasVertex outVertex = propagationEdge.getOutVertex();
+                        AtlasVertex inVertex = propagationEdge.getInVertex();
+                        adjacentVertex = 
StringUtils.equals(outVertex.getIdForDisplay(), entityVertex.getIdForDisplay()) 
? inVertex : outVertex;
+                        String adjacentGuid = getGuid(adjacentVertex);
+                        boolean isPropagated = false;
+                        propagateClassificationVertices = 
getClassificationVertices(inVertex, outVertex, isProcess, true, 
classificationName);
+                        while (propagateClassificationVertices.hasNext()) {
+                            AtlasVertex classificationVertex = 
propagateClassificationVertices.next();
+                            fetchedClassificationGuid = 
classificationVertex.getProperty(CLASSIFICATION_ENTITY_GUID, String.class);
+                            if 
(StringUtils.equals(classification.getEntityGuid(), fetchedClassificationGuid)) 
{
+                                addAdjacentVertices(context, adjacentGuid);
+                                isPropagated = true;
+                            }
+                        }
+                        if (!isPropagated) {
+                            appliedClassificationVertices = 
getClassificationVertices(inVertex, outVertex, isProcess, false, 
classificationName);
+
+                            while (appliedClassificationVertices.hasNext()) {
+                                AtlasVertex classificationVertex = 
appliedClassificationVertices.next();
+                                fetchedClassificationGuid = 
classificationVertex.getProperty(CLASSIFICATION_ENTITY_GUID, String.class);
+                                if 
(StringUtils.equals(classification.getEntityGuid(), fetchedClassificationGuid)) 
{
+                                    addAdjacentVertices(context, adjacentGuid);
+                                    break;
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    private Iterator<AtlasVertex> getClassificationVertices(AtlasVertex 
inVertex, AtlasVertex outVertex,
+                                                         boolean isProcess, 
boolean isPropagated, String name) {
+        AtlasVertex base = isProcess ? inVertex : outVertex;
+        return base.query()
+                .direction(AtlasEdgeDirection.OUT)
+                .label(CLASSIFICATION_LABEL)
+                .has(CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY, 
isPropagated)
+                .has(CLASSIFICATION_EDGE_NAME_PROPERTY_KEY, name)
+                .vertices().iterator();
+    }
+
+    private void addAdjacentVertices(ExportContext context, String 
adjacentGuid) throws AtlasBaseException {
+        if (!context.newAddedGuids.contains(adjacentGuid)) {
+            context.newAddedGuids.add(adjacentGuid);
+        }
+        if (!context.sink.guids.contains(adjacentGuid)) {
+            
context.addToSink(entityGraphRetriever.toAtlasEntityWithExtInfo(adjacentGuid));

Review Comment:
   Isn't it necessary to exportTypeProcessor.addTypes before addToSink



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to