This is an automated email from the ASF dual-hosted git repository.

mcgilman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new ce050f4  NIFI-7009: Atlas reporting task retrieves only the active 
flow components
ce050f4 is described below

commit ce050f4ecb839df24b8e5235f91a48acfde30e01
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Sat Jan 4 22:48:44 2020 +0100

    NIFI-7009: Atlas reporting task retrieves only the active flow components
    
    Filter out the deleted components before querying them, instead of 
retrieving
    all the components before filtering.
    
    This closes #3979
---
 .../org/apache/nifi/atlas/NiFiAtlasClient.java     | 38 +++++++++++++++++++---
 1 file changed, 34 insertions(+), 4 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java
index 49eacd1..e40e034 100644
--- 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java
@@ -193,6 +193,7 @@ public class NiFiAtlasClient {
         }
 
         final AtlasEntity nifiFlowEntity = nifiFlowExt.getEntity();
+        final Map<String, AtlasEntity> nifiFlowReferredEntities = 
nifiFlowExt.getReferredEntities();
         final Map<String, Object> attributes = nifiFlowEntity.getAttributes();
         final NiFiFlow nifiFlow = new NiFiFlow(rootProcessGroupId);
         nifiFlow.setExEntity(nifiFlowEntity);
@@ -201,12 +202,12 @@ public class NiFiAtlasClient {
         nifiFlow.setUrl(toStr(attributes.get(ATTR_URL)));
         nifiFlow.setDescription(toStr(attributes.get(ATTR_DESCRIPTION)));
 
-        
nifiFlow.getQueues().putAll(toQualifiedNameIds(toAtlasObjectIds(nifiFlowEntity.getAttribute(ATTR_QUEUES))));
-        
nifiFlow.getRootInputPortEntities().putAll(toQualifiedNameIds(toAtlasObjectIds(nifiFlowEntity.getAttribute(ATTR_INPUT_PORTS))));
-        
nifiFlow.getRootOutputPortEntities().putAll(toQualifiedNameIds(toAtlasObjectIds(nifiFlowEntity.getAttribute(ATTR_OUTPUT_PORTS))));
+        nifiFlow.getQueues().putAll(fetchFlowComponents(TYPE_NIFI_QUEUE, 
nifiFlowReferredEntities));
+        
nifiFlow.getRootInputPortEntities().putAll(fetchFlowComponents(TYPE_NIFI_INPUT_PORT,
 nifiFlowReferredEntities));
+        
nifiFlow.getRootOutputPortEntities().putAll(fetchFlowComponents(TYPE_NIFI_OUTPUT_PORT,
 nifiFlowReferredEntities));
 
         final Map<String, NiFiFlowPath> flowPaths = nifiFlow.getFlowPaths();
-        final Map<AtlasObjectId, AtlasEntity> flowPathEntities = 
toQualifiedNameIds(toAtlasObjectIds(attributes.get(ATTR_FLOW_PATHS)));
+        final Map<AtlasObjectId, AtlasEntity> flowPathEntities = 
fetchFlowComponents(TYPE_NIFI_FLOW_PATH, nifiFlowReferredEntities);
 
         for (AtlasEntity flowPathEntity : flowPathEntities.values()) {
             final String pathQualifiedName = 
toStr(flowPathEntity.getAttribute(ATTR_QUALIFIED_NAME));
@@ -230,6 +231,35 @@ public class NiFiAtlasClient {
         return nifiFlow;
     }
 
+    /**
+     * Retrieves the flow components of type {@code componentType} from Atlas 
server.
+     * Deleted components will be filtered out before calling Atlas.
+     * Atlas object ids will be initialized with all the attributes (guid, 
type, unique attributes) in order to be able
+     * to match ids retrieved from Atlas (having guid) and ids created by the 
reporting task (not having guid yet).
+     *
+     * @param componentType Atlas type of the flow component (nifi_flow_path, 
nifi_queue, nifi_input_port, nifi_output_port)
+     * @param referredEntities referred entities of the flow entity (returned 
when the flow fetched) containing the basic data (id, status) of the flow 
components
+     * @return flow component entities mapped to their object ids
+     */
+    private Map<AtlasObjectId, AtlasEntity> fetchFlowComponents(String 
componentType, Map<String, AtlasEntity> referredEntities) {
+        return referredEntities.values().stream()
+                .filter(referredEntity -> 
referredEntity.getTypeName().equals(componentType))
+                .filter(referredEntity -> referredEntity.getStatus() == 
AtlasEntity.Status.ACTIVE)
+                .map(referredEntity -> {
+                    final Map<String, Object> uniqueAttributes = 
Collections.singletonMap(ATTR_QUALIFIED_NAME, 
referredEntity.getAttribute(ATTR_QUALIFIED_NAME));
+                    final AtlasObjectId id = new 
AtlasObjectId(referredEntity.getGuid(), componentType, uniqueAttributes);
+                    try {
+                        final AtlasEntity.AtlasEntityWithExtInfo 
fetchedEntityExt = searchEntityDef(id);
+                        return new Tuple<>(id, fetchedEntityExt.getEntity());
+                    } catch (AtlasServiceException e) {
+                        logger.warn("Failed to search entity by id {}, due to 
{}", id, e);
+                        return null;
+                    }
+                })
+                .filter(Objects::nonNull)
+                .collect(Collectors.toMap(Tuple::getKey, Tuple::getValue));
+    }
+
     @SuppressWarnings("unchecked")
     private List<AtlasObjectId> toAtlasObjectIds(Object _references) {
         if (_references == null) {

Reply via email to