http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java
new file mode 100644
index 0000000..feb2b48
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java
@@ -0,0 +1,542 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas;
+
+import com.sun.jersey.api.client.UniformInterfaceException;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.SearchFilter;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.model.typedef.AtlasEntityDef;
+import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.nifi.atlas.security.AtlasAuthN;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.util.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.MultivaluedMap;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.atlas.AtlasUtils.findIdByQualifiedName;
+import static org.apache.nifi.atlas.AtlasUtils.getComponentIdFromQualifiedName;
+import static org.apache.nifi.atlas.AtlasUtils.toStr;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.AS_IS;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.CREATED;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.DELETED;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.UPDATED;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_DESCRIPTION;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_FLOW_PATHS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUT_PORTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUT_PORTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUEUES;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_URL;
+import static org.apache.nifi.atlas.NiFiTypes.ENTITIES;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_INPUT_PORT;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_OUTPUT_PORT;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE;
+
+public class NiFiAtlasClient {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(NiFiAtlasClient.class);
+
+    private static NiFiAtlasClient nifiClient;
+    private AtlasClientV2 atlasClient;
+
+    private NiFiAtlasClient() {
+        super();
+    }
+
+    public static NiFiAtlasClient getInstance() {
+        if (nifiClient == null) {
+            synchronized (NiFiAtlasClient.class) {
+                if (nifiClient == null) {
+                    nifiClient = new NiFiAtlasClient();
+                }
+            }
+        }
+        return nifiClient;
+    }
+
+    public void initialize(final String[] baseUrls, final AtlasAuthN authN, 
final File atlasConfDir) {
+
+        synchronized (NiFiAtlasClient.class) {
+
+            if (atlasClient != null) {
+                logger.info("{} had been setup but replacing it with new 
one.", atlasClient);
+                ApplicationProperties.forceReload();
+            }
+
+            if (atlasConfDir != null) {
+                // If atlasConfDir is not set, atlas-application.properties 
will be searched under classpath.
+                Properties props = System.getProperties();
+                final String atlasConfProp = "atlas.conf";
+                props.setProperty(atlasConfProp, 
atlasConfDir.getAbsolutePath());
+                logger.debug("{} has been set to: {}", atlasConfProp, 
props.getProperty(atlasConfProp));
+            }
+
+            atlasClient = authN.createClient(baseUrls);
+
+        }
+    }
+
+    /**
+     * This is an utility method to delete unused types.
+     * Should be used during development or testing only.
+     * @param typeNames to delete
+     */
+    void deleteTypeDefs(String ... typeNames) throws AtlasServiceException {
+        final AtlasTypesDef existingTypeDef = getTypeDefs(typeNames);
+        try {
+            atlasClient.deleteAtlasTypeDefs(existingTypeDef);
+        } catch (UniformInterfaceException e) {
+            if (e.getResponse().getStatus() == 204) {
+                // 204 is a successful response.
+                // NOTE: However after executing this, Atlas should be 
restarted to work properly.
+                logger.info("Deleted type defs: {}", existingTypeDef);
+            } else {
+                throw e;
+            }
+        }
+    }
+
+    /**
+     * @return True when required NiFi types are already created.
+     */
+    public boolean isNiFiTypeDefsRegistered() throws AtlasServiceException {
+        final Set<String> typeNames = ENTITIES.keySet();
+        final Map<String, AtlasEntityDef> existingDefs = 
getTypeDefs(typeNames.toArray(new 
String[typeNames.size()])).getEntityDefs().stream()
+                .collect(Collectors.toMap(AtlasEntityDef::getName, 
Function.identity()));
+        return typeNames.stream().allMatch(existingDefs::containsKey);
+    }
+
+    /**
+     * Create or update NiFi types in Atlas type system.
+     * @param update If false, doesn't perform anything if there is existing 
type def for the name.
+     */
+    public void registerNiFiTypeDefs(boolean update) throws 
AtlasServiceException {
+        final Set<String> typeNames = ENTITIES.keySet();
+        final Map<String, AtlasEntityDef> existingDefs = 
getTypeDefs(typeNames.toArray(new 
String[typeNames.size()])).getEntityDefs().stream()
+                .collect(Collectors.toMap(AtlasEntityDef::getName, 
Function.identity()));
+
+
+        final AtomicBoolean shouldUpdate = new AtomicBoolean(false);
+
+        final AtlasTypesDef type = new AtlasTypesDef();
+
+        typeNames.stream().filter(typeName -> {
+            final AtlasEntityDef existingDef = existingDefs.get(typeName);
+            if (existingDef != null) {
+                // type is already defined.
+                if (!update) {
+                    return false;
+                }
+                shouldUpdate.set(true);
+            }
+            return true;
+        }).forEach(typeName -> {
+            final NiFiTypes.EntityDefinition def = ENTITIES.get(typeName);
+
+            final AtlasEntityDef entity = new AtlasEntityDef();
+            type.getEntityDefs().add(entity);
+
+            entity.setName(typeName);
+
+            Set<String> superTypes = new HashSet<>();
+            List<AtlasAttributeDef> attributes = new ArrayList<>();
+
+            def.define(entity, superTypes, attributes);
+
+            entity.setSuperTypes(superTypes);
+            entity.setAttributeDefs(attributes);
+        });
+
+        // Create or Update.
+        final AtlasTypesDef atlasTypeDefsResult = shouldUpdate.get()
+                ? atlasClient.updateAtlasTypeDefs(type)
+                : atlasClient.createAtlasTypeDefs(type);
+        logger.debug("Result={}", atlasTypeDefsResult);
+    }
+
+    private AtlasTypesDef getTypeDefs(String ... typeNames) throws 
AtlasServiceException {
+        final AtlasTypesDef typeDefs = new AtlasTypesDef();
+        for (int i = 0; i < typeNames.length; i++) {
+            final MultivaluedMap<String, String> searchParams = new 
MultivaluedMapImpl();
+            searchParams.add(SearchFilter.PARAM_NAME, typeNames[i]);
+            final AtlasTypesDef typeDef = atlasClient.getAllTypeDefs(new 
SearchFilter(searchParams));
+            typeDefs.getEntityDefs().addAll(typeDef.getEntityDefs());
+        }
+        logger.debug("typeDefs={}", typeDefs);
+        return typeDefs;
+    }
+
+    private Pattern FLOW_PATH_URL_PATTERN = 
Pattern.compile("^http.+processGroupId=([0-9a-z\\-]+).*$");
+    /**
+     * Fetch existing NiFiFlow entity from Atlas.
+     * @param rootProcessGroupId The id of a NiFi flow root process group.
+     * @param clusterName The cluster name of a flow.
+     * @return A NiFiFlow instance filled with retrieved data from Atlas. 
Status objects are left blank, e.g. ProcessorStatus.
+     * @throws AtlasServiceException Thrown if requesting to Atlas API failed, 
including when the flow is not found.
+     */
+    public NiFiFlow fetchNiFiFlow(String rootProcessGroupId, String 
clusterName) throws AtlasServiceException {
+
+        final String qualifiedName = AtlasUtils.toQualifiedName(clusterName, 
rootProcessGroupId);
+        final AtlasObjectId flowId = new AtlasObjectId(TYPE_NIFI_FLOW, 
ATTR_QUALIFIED_NAME, qualifiedName);
+        final AtlasEntity.AtlasEntityWithExtInfo nifiFlowExt = 
searchEntityDef(flowId);
+
+        if (nifiFlowExt == null || nifiFlowExt.getEntity() == null) {
+            return null;
+        }
+
+        final AtlasEntity nifiFlowEntity = nifiFlowExt.getEntity();
+        final Map<String, Object> attributes = nifiFlowEntity.getAttributes();
+        final NiFiFlow nifiFlow = new NiFiFlow(rootProcessGroupId);
+        nifiFlow.setExEntity(nifiFlowEntity);
+        nifiFlow.setFlowName(toStr(attributes.get(ATTR_NAME)));
+        nifiFlow.setClusterName(clusterName);
+        nifiFlow.setUrl(toStr(attributes.get(ATTR_URL)));
+        nifiFlow.setDescription(toStr(attributes.get(ATTR_DESCRIPTION)));
+
+        
nifiFlow.getQueues().putAll(toQualifiedNameIds(toAtlasObjectIds(nifiFlowEntity.getAttribute(ATTR_QUEUES))));
+        
nifiFlow.getRootInputPortEntities().putAll(toQualifiedNameIds(toAtlasObjectIds(nifiFlowEntity.getAttribute(ATTR_INPUT_PORTS))));
+        
nifiFlow.getRootOutputPortEntities().putAll(toQualifiedNameIds(toAtlasObjectIds(nifiFlowEntity.getAttribute(ATTR_OUTPUT_PORTS))));
+
+        final Map<String, NiFiFlowPath> flowPaths = nifiFlow.getFlowPaths();
+        final Map<AtlasObjectId, AtlasEntity> flowPathEntities = 
toQualifiedNameIds(toAtlasObjectIds(attributes.get(ATTR_FLOW_PATHS)));
+
+        for (AtlasEntity flowPathEntity : flowPathEntities.values()) {
+            final String pathQualifiedName = 
toStr(flowPathEntity.getAttribute(ATTR_QUALIFIED_NAME));
+            final NiFiFlowPath flowPath = new 
NiFiFlowPath(getComponentIdFromQualifiedName(pathQualifiedName));
+            if (flowPathEntity.hasAttribute(ATTR_URL)) {
+                final Matcher urlMatcher = 
FLOW_PATH_URL_PATTERN.matcher(toStr(flowPathEntity.getAttribute(ATTR_URL)));
+                if (urlMatcher.matches()) {
+                    flowPath.setGroupId(urlMatcher.group(1));
+                }
+            }
+            flowPath.setExEntity(flowPathEntity);
+            flowPath.setName(toStr(flowPathEntity.getAttribute(ATTR_NAME)));
+            
flowPath.getInputs().addAll(toQualifiedNameIds(toAtlasObjectIds(flowPathEntity.getAttribute(ATTR_INPUTS))).keySet());
+            
flowPath.getOutputs().addAll(toQualifiedNameIds(toAtlasObjectIds(flowPathEntity.getAttribute(ATTR_OUTPUTS))).keySet());
+            flowPath.startTrackingChanges(nifiFlow);
+
+            flowPaths.put(flowPath.getId(), flowPath);
+        }
+
+        nifiFlow.startTrackingChanges();
+        return nifiFlow;
+    }
+
+    @SuppressWarnings("unchecked")
+    private List<AtlasObjectId> toAtlasObjectIds(Object _references) {
+        if (_references == null) {
+            return Collections.emptyList();
+        }
+        List<Map<String, Object>> references = (List<Map<String, Object>>) 
_references;
+        return references.stream()
+                .map(ref -> new AtlasObjectId(toStr(ref.get(ATTR_GUID)), 
toStr(ref.get(ATTR_TYPENAME)), ref))
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * <p>AtlasObjectIds returned from Atlas have GUID, but do not have 
qualifiedName, while ones created by the reporting task
+     * do not have GUID, but qualifiedName. AtlasObjectId.equals returns false 
for this combination.
+     * In order to match ids correctly, this method converts fetches actual 
entities from ids to get qualifiedName attribute.</p>
+     *
+     * <p>Also, AtlasObjectIds returned from Atlas does not have entity state.
+     * If Atlas is configured to use soft-delete (default), deleted ids are 
still returned.
+     * Fetched entities are used to determine whether an AtlasObjectId is 
still active or deleted.
+     * Deleted entities will not be included in the result of this method.
+     * </p>
+     * @param ids to convert
+     * @return AtlasObjectIds with qualifiedName
+     */
+    private Map<AtlasObjectId, AtlasEntity> 
toQualifiedNameIds(List<AtlasObjectId> ids) {
+        if (ids == null) {
+            return Collections.emptyMap();
+        }
+
+        return ids.stream().distinct().map(id -> {
+            try {
+                final AtlasEntity.AtlasEntityWithExtInfo entityExt = 
searchEntityDef(id);
+                final AtlasEntity entity = entityExt.getEntity();
+                if (AtlasEntity.Status.DELETED.equals(entity.getStatus())) {
+                    return null;
+                }
+                final Map<String, Object> uniqueAttrs = 
Collections.singletonMap(ATTR_QUALIFIED_NAME, 
entity.getAttribute(ATTR_QUALIFIED_NAME));
+                return new Tuple<>(new AtlasObjectId(id.getGuid(), 
id.getTypeName(), uniqueAttrs), entity);
+            } catch (AtlasServiceException e) {
+                logger.warn("Failed to search entity by id {}, due to {}", id, 
e);
+                return null;
+            }
+        }).filter(Objects::nonNull).collect(Collectors.toMap(Tuple::getKey, 
Tuple::getValue));
+    }
+
+    public void registerNiFiFlow(NiFiFlow nifiFlow) throws 
AtlasServiceException {
+
+        // Create parent flow entity, so that common properties are taken over.
+        final AtlasEntity flowEntity = registerNiFiFlowEntity(nifiFlow);
+
+        // Create DataSet entities those are created by this NiFi flow.
+        final Map<String, List<AtlasEntity>> updatedDataSetEntities = 
registerDataSetEntities(nifiFlow);
+
+        // Create path entities.
+        final Set<AtlasObjectId> remainingPathIds = 
registerFlowPathEntities(nifiFlow);
+
+        // Update these attributes only if anything is created, updated or 
removed.
+        boolean shouldUpdateNiFiFlow = nifiFlow.isMetadataUpdated();
+        if (remainingPathIds != null) {
+            flowEntity.setAttribute(ATTR_FLOW_PATHS, remainingPathIds);
+            shouldUpdateNiFiFlow = true;
+        }
+        if (updatedDataSetEntities.containsKey(TYPE_NIFI_QUEUE)) {
+            flowEntity.setAttribute(ATTR_QUEUES, 
updatedDataSetEntities.get(TYPE_NIFI_QUEUE));
+            shouldUpdateNiFiFlow = true;
+        }
+        if (updatedDataSetEntities.containsKey(TYPE_NIFI_INPUT_PORT)) {
+            flowEntity.setAttribute(ATTR_INPUT_PORTS, 
updatedDataSetEntities.get(TYPE_NIFI_INPUT_PORT));
+            shouldUpdateNiFiFlow = true;
+        }
+        if (updatedDataSetEntities.containsKey(TYPE_NIFI_OUTPUT_PORT)) {
+            flowEntity.setAttribute(ATTR_OUTPUT_PORTS, 
updatedDataSetEntities.get(TYPE_NIFI_OUTPUT_PORT));
+            shouldUpdateNiFiFlow = true;
+        }
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("### NiFi Flow Audit Logs START");
+            nifiFlow.getUpdateAudit().forEach(logger::debug);
+            nifiFlow.getFlowPaths().forEach((k, v) -> {
+                logger.debug("--- NiFiFlowPath Audit Logs: {}", k);
+                v.getUpdateAudit().forEach(logger::debug);
+            });
+            logger.debug("### NiFi Flow Audit Logs END");
+        }
+
+        if (shouldUpdateNiFiFlow) {
+            // Send updated entities.
+            final List<AtlasEntity> entities = new ArrayList<>();
+            final AtlasEntity.AtlasEntitiesWithExtInfo atlasEntities = new 
AtlasEntity.AtlasEntitiesWithExtInfo(entities);
+            entities.add(flowEntity);
+            try {
+                final EntityMutationResponse mutationResponse = 
atlasClient.createEntities(atlasEntities);
+                logger.debug("mutation response={}", mutationResponse);
+            } catch (AtlasServiceException e) {
+                if (e.getStatus().getStatusCode() == 
AtlasErrorCode.INSTANCE_NOT_FOUND.getHttpCode().getStatusCode()
+                        && 
e.getMessage().contains(AtlasErrorCode.INSTANCE_NOT_FOUND.getErrorCode())) {
+                    // NOTE: If previously existed nifi_flow_path entity is 
removed because the path is removed from NiFi,
+                    // then Atlas respond with 404 even though the entity is 
successfully updated.
+                    // Following exception is thrown in this case. Just log it.
+                    // org.apache.atlas.AtlasServiceException:
+                    // Metadata service API 
org.apache.atlas.AtlasBaseClient$APIInfo@45a37759
+                    // failed with status 404 (Not Found) Response Body
+                    // ({"errorCode":"ATLAS-404-00-00B","errorMessage":"Given 
instance is invalid/not found:
+                    // Could not find entities in the repository with guids: 
[96d24487-cd66-4795-b552-f00b426fed26]"})
+                    logger.debug("Received error response from Atlas but it 
should be stored." + e);
+                } else {
+                    throw e;
+                }
+            }
+        }
+    }
+
+    private AtlasEntity registerNiFiFlowEntity(final NiFiFlow nifiFlow) throws 
AtlasServiceException {
+        final List<AtlasEntity> entities = new ArrayList<>();
+        final AtlasEntity.AtlasEntitiesWithExtInfo atlasEntities = new 
AtlasEntity.AtlasEntitiesWithExtInfo(entities);
+
+        if (!nifiFlow.isMetadataUpdated()) {
+            // Nothing has been changed, return existing entity.
+            return nifiFlow.getExEntity();
+        }
+
+        // Create parent flow entity using existing NiFiFlow entity if 
available, so that common properties are taken over.
+        final AtlasEntity flowEntity = nifiFlow.getExEntity() != null ? new 
AtlasEntity(nifiFlow.getExEntity()) : new AtlasEntity();
+        flowEntity.setTypeName(TYPE_NIFI_FLOW);
+        flowEntity.setVersion(1L);
+        flowEntity.setAttribute(ATTR_NAME, nifiFlow.getFlowName());
+        flowEntity.setAttribute(ATTR_QUALIFIED_NAME, 
nifiFlow.toQualifiedName(nifiFlow.getRootProcessGroupId()));
+        flowEntity.setAttribute(ATTR_URL, nifiFlow.getUrl());
+        flowEntity.setAttribute(ATTR_DESCRIPTION, nifiFlow.getDescription());
+
+        // If flowEntity is not persisted yet, then store nifi_flow entity to 
make nifiFlowId available for other entities.
+        if (flowEntity.getGuid().startsWith("-")) {
+            entities.add(flowEntity);
+            final EntityMutationResponse mutationResponse = 
atlasClient.createEntities(atlasEntities);
+            logger.debug("Registered a new nifi_flow entity, mutation 
response={}", mutationResponse);
+            final String assignedNiFiFlowGuid = 
mutationResponse.getGuidAssignments().get(flowEntity.getGuid());
+            flowEntity.setGuid(assignedNiFiFlowGuid);
+            nifiFlow.setAtlasGuid(assignedNiFiFlowGuid);
+        }
+
+        return flowEntity;
+    }
+
+    /**
+     * Register DataSet within specified NiFiFlow.
+     * @return Set of registered Atlas type names and its remaining entities 
without deleted ones.
+     */
+    private Map<String, List<AtlasEntity>> registerDataSetEntities(final 
NiFiFlow nifiFlow) throws AtlasServiceException {
+
+        final Map<NiFiFlow.EntityChangeType, List<AtlasEntity>> 
changedEntities = nifiFlow.getChangedDataSetEntities();
+
+        if (changedEntities.containsKey(CREATED)) {
+            final List<AtlasEntity> createdEntities = 
changedEntities.get(CREATED);
+            final AtlasEntity.AtlasEntitiesWithExtInfo atlasEntities = new 
AtlasEntity.AtlasEntitiesWithExtInfo(createdEntities);
+            final EntityMutationResponse mutationResponse = 
atlasClient.createEntities(atlasEntities);
+            logger.debug("Created DataSet entities mutation response={}", 
mutationResponse);
+
+            final Map<String, String> guidAssignments = 
mutationResponse.getGuidAssignments();
+            for (AtlasEntity entity : createdEntities) {
+
+                final String guid = guidAssignments.get(entity.getGuid());
+                final String qualifiedName = 
toStr(entity.getAttribute(ATTR_QUALIFIED_NAME));
+                if (StringUtils.isEmpty(guid)) {
+                    logger.warn("GUID was not assigned for {}::{} for some 
reason.", entity.getTypeName(), qualifiedName);
+                    continue;
+                }
+
+                final Map<AtlasObjectId, AtlasEntity> entityMap;
+                switch (entity.getTypeName()) {
+                    case TYPE_NIFI_INPUT_PORT:
+                        entityMap = nifiFlow.getRootInputPortEntities();
+                        break;
+                    case TYPE_NIFI_OUTPUT_PORT:
+                        entityMap = nifiFlow.getRootOutputPortEntities();
+                        break;
+                    case TYPE_NIFI_QUEUE:
+                        entityMap = nifiFlow.getQueues();
+                        break;
+                    default:
+                        throw new RuntimeException(entity.getTypeName() + " is 
not expected.");
+                }
+
+
+                // In order to replace the id, remove current id which does 
not have GUID.
+                findIdByQualifiedName(entityMap.keySet(), 
qualifiedName).ifPresent(entityMap::remove);
+                entity.setGuid(guid);
+                final AtlasObjectId idWithGuid = new AtlasObjectId(guid, 
entity.getTypeName(), Collections.singletonMap(ATTR_QUALIFIED_NAME, 
qualifiedName));
+                entityMap.put(idWithGuid, entity);
+            }
+        }
+
+        if (changedEntities.containsKey(UPDATED)) {
+            final List<AtlasEntity> updatedEntities = 
changedEntities.get(UPDATED);
+            final AtlasEntity.AtlasEntitiesWithExtInfo atlasEntities = new 
AtlasEntity.AtlasEntitiesWithExtInfo(updatedEntities);
+            final EntityMutationResponse mutationResponse = 
atlasClient.updateEntities(atlasEntities);
+            logger.debug("Updated DataSet entities mutation response={}", 
mutationResponse);
+        }
+
+        final Set<String> changedTypeNames = 
changedEntities.entrySet().stream()
+                .filter(entry -> !AS_IS.equals(entry.getKey())).flatMap(entry 
-> entry.getValue().stream())
+                .map(AtlasEntity::getTypeName)
+                .collect(Collectors.toSet());
+
+        // NOTE: Cascading DELETE will be performed when parent NiFiFlow is 
updated without removed DataSet entities.
+        final Map<String, List<AtlasEntity>> remainingEntitiesByType = 
changedEntities.entrySet().stream()
+                .filter(entry -> !DELETED.equals(entry.getKey()))
+                .flatMap(entry -> entry.getValue().stream())
+                .filter(entity -> 
changedTypeNames.contains(entity.getTypeName()))
+                .collect(Collectors.groupingBy(AtlasEntity::getTypeName));
+
+        // If all entities are deleted for a type (e.g. nifi_intput_port), 
then remainingEntitiesByType will not contain such key.
+        // If the returning map does not contain anything for a type, then the 
corresponding attribute will not be updated.
+        // To empty an attribute when all of its elements are deleted, add 
empty list for a type.
+        changedTypeNames.forEach(changedTypeName -> 
remainingEntitiesByType.computeIfAbsent(changedTypeName, k -> 
Collections.emptyList()));
+        return remainingEntitiesByType;
+    }
+
+    private Set<AtlasObjectId> registerFlowPathEntities(final NiFiFlow 
nifiFlow) throws AtlasServiceException {
+
+        final Map<NiFiFlow.EntityChangeType, List<AtlasEntity>> 
changedEntities = nifiFlow.getChangedFlowPathEntities();
+
+        if (changedEntities.containsKey(CREATED)) {
+            final List<AtlasEntity> createdEntities = 
changedEntities.get(CREATED);
+            final AtlasEntity.AtlasEntitiesWithExtInfo atlasEntities = new 
AtlasEntity.AtlasEntitiesWithExtInfo(createdEntities);
+            final EntityMutationResponse mutationResponse = 
atlasClient.createEntities(atlasEntities);
+            logger.debug("Created FlowPath entities mutation response={}", 
mutationResponse);
+
+            final Map<String, String> guidAssignments = 
mutationResponse.getGuidAssignments();
+            createdEntities.forEach(entity -> {
+                final String guid = entity.getGuid();
+                entity.setGuid(guidAssignments.get(guid));
+                final String pathId = 
getComponentIdFromQualifiedName(toStr(entity.getAttribute(ATTR_QUALIFIED_NAME)));
+                final NiFiFlowPath path = nifiFlow.getFlowPaths().get(pathId);
+                path.setExEntity(entity);
+            });
+        }
+
+        if (changedEntities.containsKey(UPDATED)) {
+            final List<AtlasEntity> updatedEntities = 
changedEntities.get(UPDATED);
+            final AtlasEntity.AtlasEntitiesWithExtInfo atlasEntities = new 
AtlasEntity.AtlasEntitiesWithExtInfo(updatedEntities);
+            final EntityMutationResponse mutationResponse = 
atlasClient.updateEntities(atlasEntities);
+            logger.debug("Updated FlowPath entities mutation response={}", 
mutationResponse);
+            updatedEntities.forEach(entity -> {
+                final String pathId = 
getComponentIdFromQualifiedName(toStr(entity.getAttribute(ATTR_QUALIFIED_NAME)));
+                final NiFiFlowPath path = nifiFlow.getFlowPaths().get(pathId);
+                path.setExEntity(entity);
+            });
+        }
+
+        if 
(NiFiFlow.EntityChangeType.containsChange(changedEntities.keySet())) {
+            return changedEntities.entrySet().stream()
+                    .filter(entry -> 
!DELETED.equals(entry.getKey())).flatMap(entry -> entry.getValue().stream())
+                    .map(path -> new AtlasObjectId(path.getGuid(), 
TYPE_NIFI_FLOW_PATH,
+                            Collections.singletonMap(ATTR_QUALIFIED_NAME, 
path.getAttribute(ATTR_QUALIFIED_NAME))))
+                    .collect(Collectors.toSet());
+        }
+
+        return null;
+    }
+
+    public AtlasEntity.AtlasEntityWithExtInfo searchEntityDef(AtlasObjectId 
id) throws AtlasServiceException {
+        final String guid = id.getGuid();
+        if (!StringUtils.isEmpty(guid)) {
+            return atlasClient.getEntityByGuid(guid);
+        }
+        final Map<String, String> attributes = new HashMap<>();
+        id.getUniqueAttributes().entrySet().stream().filter(entry -> 
entry.getValue() != null)
+                .forEach(entry -> attributes.put(entry.getKey(), 
entry.getValue().toString()));
+        return atlasClient.getEntityByAttribute(id.getTypeName(), attributes);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java
new file mode 100644
index 0000000..43fefff
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas;
+
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.hook.AtlasHook;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import 
org.apache.atlas.notification.hook.HookNotification.EntityPartialUpdateRequest;
+import 
org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.typesystem.persistence.Id;
+import org.apache.nifi.atlas.provenance.lineage.LineageContext;
+import org.apache.nifi.util.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static 
org.apache.atlas.notification.hook.HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE;
+import static org.apache.nifi.atlas.AtlasUtils.toTypedQualifiedName;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH;
+
+/**
+ * This class is not thread-safe as it holds uncommitted notification messages 
within instance.
+ * {@link #addMessage(HookNotificationMessage)} and {@link #commitMessages()} 
should be used serially from a single thread.
+ */
+public class NiFiAtlasHook extends AtlasHook implements LineageContext {
+
+    public static final String NIFI_USER = "nifi";
+
+    private static final Logger logger = 
LoggerFactory.getLogger(NiFiAtlasHook.class);
+    private static final String CONF_PREFIX = "atlas.hook.nifi.";
+    private static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries";
+
+    private final NiFiAtlasClient atlasClient;
+
+    /**
+     * An index to resolve a qualifiedName from a GUID.
+     */
+    private final Map<String, String> guidToQualifiedName;
+    /**
+     * An index to resolve a Referenceable from a typeName::qualifiedName.
+     */
+    private final Map<String, Referenceable> typedQualifiedNameToRef;
+
+
+    private static <K, V> Map<K, V> createCache(final int maxSize) {
+        return new LinkedHashMap<K, V>(maxSize, 0.75f, true) {
+            @Override
+            protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
+                return size() > maxSize;
+            }
+        };
+    }
+
+    public NiFiAtlasHook(NiFiAtlasClient atlasClient) {
+        this.atlasClient = atlasClient;
+
+        final int qualifiedNameCacheSize = 10_000;
+        this.guidToQualifiedName = createCache(qualifiedNameCacheSize);
+
+        final int dataSetRefCacheSize = 1_000;
+        this.typedQualifiedNameToRef = createCache(dataSetRefCacheSize);
+    }
+
+    @Override
+    protected String getNumberOfRetriesPropertyKey() {
+        return HOOK_NUM_RETRIES;
+    }
+
+
+    private final List<HookNotificationMessage> messages = new ArrayList<>();
+
+    @Override
+    public void addMessage(HookNotificationMessage message) {
+        messages.add(message);
+    }
+
+    private class Metrics {
+        final long startedAt = System.currentTimeMillis();
+        int totalMessages;
+        int partialNiFiFlowPathUpdates;
+        int dedupedPartialNiFiFlowPathUpdates;
+        int otherMessages;
+        int flowPathSearched;
+        int dataSetSearched;
+        int dataSetCacheHit;
+        private void log(String message) {
+            logger.debug(String.format("%s, %d ms passed, totalMessages=%d," +
+                    " partialNiFiFlowPathUpdates=%d, 
dedupedPartialNiFiFlowPathUpdates=%d, otherMessage=%d," +
+                    " flowPathSearched=%d, dataSetSearched=%d, 
dataSetCacheHit=%s," +
+                    " guidToQualifiedName.size=%d, 
typedQualifiedNameToRef.size=%d",
+                    message, System.currentTimeMillis() - startedAt, 
totalMessages,
+                    partialNiFiFlowPathUpdates, 
dedupedPartialNiFiFlowPathUpdates, otherMessages,
+                    flowPathSearched, dataSetSearched, dataSetCacheHit,
+                    guidToQualifiedName.size(), 
typedQualifiedNameToRef.size()));
+        }
+    }
+
+    public void commitMessages() {
+        final Map<Boolean, List<HookNotificationMessage>> 
partialNiFiFlowPathUpdateAndOthers
+                = messages.stream().collect(Collectors.groupingBy(msg
+                    -> ENTITY_PARTIAL_UPDATE.equals(msg.getType())
+                        && 
TYPE_NIFI_FLOW_PATH.equals(((EntityPartialUpdateRequest)msg).getTypeName())
+                        && 
ATTR_QUALIFIED_NAME.equals(((EntityPartialUpdateRequest)msg).getAttribute())
+        ));
+
+
+        final List<HookNotificationMessage> otherMessages = 
partialNiFiFlowPathUpdateAndOthers.computeIfAbsent(false, k -> 
Collections.emptyList());
+        final List<HookNotificationMessage> partialNiFiFlowPathUpdates = 
partialNiFiFlowPathUpdateAndOthers.computeIfAbsent(true, k -> 
Collections.emptyList());
+        logger.info("Commit messages: {} partialNiFiFlowPathUpdate and {} 
other messages.", partialNiFiFlowPathUpdates.size(), otherMessages.size());
+
+        final Metrics metrics = new Metrics();
+        metrics.totalMessages = messages.size();
+        metrics.partialNiFiFlowPathUpdates = partialNiFiFlowPathUpdates.size();
+        metrics.otherMessages = otherMessages.size();
+
+        try {
+            // Notify other messages first.
+            notifyEntities(otherMessages);
+
+            // De-duplicate messages.
+            final List<HookNotificationMessage> deduplicatedMessages = 
partialNiFiFlowPathUpdates.stream().map(msg -> (EntityPartialUpdateRequest) msg)
+                    // Group by nifi_flow_path qualifiedName value.
+                    
.collect(Collectors.groupingBy(EntityPartialUpdateRequest::getAttributeValue)).entrySet().stream()
+                    .map(entry -> {
+                        final String flowPathQualifiedName = entry.getKey();
+                        final Map<String, Referenceable> distinctInputs;
+                        final Map<String, Referenceable> distinctOutputs;
+                        final String flowPathGuid;
+                        try {
+                            // Fetch existing nifi_flow_path and its 
inputs/ouputs.
+                            metrics.flowPathSearched++;
+                            final AtlasEntity.AtlasEntityWithExtInfo 
flowPathExt
+                                    = atlasClient.searchEntityDef(new 
AtlasObjectId(TYPE_NIFI_FLOW_PATH, ATTR_QUALIFIED_NAME, flowPathQualifiedName));
+                            final AtlasEntity flowPathEntity = 
flowPathExt.getEntity();
+                            flowPathGuid = flowPathEntity.getGuid();
+                            distinctInputs = 
toReferenceables(flowPathEntity.getAttribute(ATTR_INPUTS), metrics);
+                            distinctOutputs = 
toReferenceables(flowPathEntity.getAttribute(ATTR_OUTPUTS), metrics);
+
+                        } catch (AtlasServiceException e) {
+                            if 
(ClientResponse.Status.NOT_FOUND.equals(e.getStatus())) {
+                                logger.debug("nifi_flow_path was not found for 
qualifiedName {}", flowPathQualifiedName);
+                            } else {
+                                logger.warn("Failed to retrieve nifi_flow_path 
with qualifiedName {} due to {}", flowPathQualifiedName, e, e);
+                            }
+                            return null;
+                        }
+
+                        // Merge all inputs and outputs for this 
nifi_flow_path.
+                        for (EntityPartialUpdateRequest msg : 
entry.getValue()) {
+                            
fromReferenceable(msg.getEntity().get(ATTR_INPUTS), metrics)
+                                    .entrySet().stream().filter(ref -> 
!distinctInputs.containsKey(ref.getKey()))
+                                    .forEach(ref -> 
distinctInputs.put(ref.getKey(), ref.getValue()));
+
+                            
fromReferenceable(msg.getEntity().get(ATTR_OUTPUTS), metrics)
+                                    .entrySet().stream().filter(ref -> 
!distinctOutputs.containsKey(ref.getKey()))
+                                    .forEach(ref -> 
distinctOutputs.put(ref.getKey(), ref.getValue()));
+                        }
+
+                        // Consolidate messages into one.
+                        final Referenceable flowPathRef = new 
Referenceable(flowPathGuid, TYPE_NIFI_FLOW_PATH, null);
+                        // NOTE: distinctInputs.values() returns 
HashMap$Values, which causes following error. To avoid that, wrap with 
ArrayList:
+                        // org.json4s.package$MappingException: Can't find 
ScalaSig for class org.apache.atlas.typesystem.Referenceable
+                        flowPathRef.set(ATTR_INPUTS, new 
ArrayList<>(distinctInputs.values()));
+                        flowPathRef.set(ATTR_OUTPUTS, new 
ArrayList<>(distinctOutputs.values()));
+                        return new EntityPartialUpdateRequest(NIFI_USER, 
TYPE_NIFI_FLOW_PATH,
+                                ATTR_QUALIFIED_NAME, flowPathQualifiedName, 
flowPathRef);
+                    })
+                    .filter(Objects::nonNull)
+                    .collect(Collectors.toList());
+
+            metrics.dedupedPartialNiFiFlowPathUpdates = 
deduplicatedMessages.size();
+            notifyEntities(deduplicatedMessages);
+        } finally {
+            metrics.log("Committed");
+            messages.clear();
+        }
+    }
+
+    /**
+     * <p>Convert nifi_flow_path inputs or outputs to a map of Referenceable 
keyed by qualifiedName.</p>
+     * <p>Atlas removes existing references those are not specified when a 
collection attribute is updated.
+     * In order to preserve existing DataSet references, existing elements 
should be passed within a partial update message.</p>
+     * <p>This method also populates entity cache for subsequent lookups.</p>
+     * @param _refs Contains references from an existin nifi_flow_path entity 
inputs or outputs attribute.
+     * @return A map of Referenceables keyed by qualifiedName.
+     */
+    @SuppressWarnings("unchecked")
+    private Map<String, Referenceable> toReferenceables(Object _refs, Metrics 
metrics) {
+        if (_refs == null) {
+            // NOTE: This empty map may be used to add new Referenceables. Can 
not be Collection.emptyMap which does not support addition.
+            return new HashMap<>();
+        }
+
+        final List<Map<String, Object>> refs = (List<Map<String, Object>>) 
_refs;
+        return refs.stream().map(ref -> {
+            // Existing reference should has a GUID.
+            final String typeName = (String) ref.get(ATTR_TYPENAME);
+            final String guid = (String) ref.get(ATTR_GUID);
+
+            if (guidToQualifiedName.containsKey(guid)) {
+                metrics.dataSetCacheHit++;
+            }
+
+            final String refQualifiedName = 
guidToQualifiedName.computeIfAbsent(guid, k -> {
+                try {
+                    metrics.dataSetSearched++;
+                    final AtlasEntity.AtlasEntityWithExtInfo refExt = 
atlasClient.searchEntityDef(new AtlasObjectId(guid, typeName));
+                    final String qualifiedName = (String) 
refExt.getEntity().getAttribute(ATTR_QUALIFIED_NAME);
+                    typedQualifiedNameToRef.put(toTypedQualifiedName(typeName, 
qualifiedName), new Referenceable(guid, typeName, Collections.EMPTY_MAP));
+                    return qualifiedName;
+                } catch (AtlasServiceException e) {
+                    if (ClientResponse.Status.NOT_FOUND.equals(e.getStatus())) 
{
+                        logger.warn("{} entity was not found for guid {}", 
typeName, guid);
+                    } else {
+                        logger.warn("Failed to retrieve {} with guid {} due to 
{}", typeName, guid, e);
+                    }
+                    return null;
+                }
+            });
+
+            if (refQualifiedName == null) {
+                return null;
+            }
+            return new Tuple<>(refQualifiedName, 
typedQualifiedNameToRef.get(toTypedQualifiedName(typeName, refQualifiedName)));
+        }).filter(Objects::nonNull).filter(tuple -> tuple.getValue() != null)
+                .collect(Collectors.toMap(Tuple::getKey, Tuple::getValue));
+    }
+
+    @SuppressWarnings("unchecked")
+    private Map<String, Referenceable> fromReferenceable(Object _refs, Metrics 
metrics) {
+        if (_refs == null) {
+            return Collections.emptyMap();
+        }
+
+        final List<Referenceable> refs = (List<Referenceable>) _refs;
+        return refs.stream().map(ref -> {
+            // This ref is created within this reporting cycle, and it may not 
have GUID assigned yet, if it is a brand new reference.
+            // If cache has the Reference, then use it because instances in 
the cache are guaranteed to have GUID assigned.
+            // Brand new Referenceables have to have all mandatory attributes.
+            final String typeName = ref.getTypeName();
+            final Id id = ref.getId();
+            final String refQualifiedName = (String) 
ref.get(ATTR_QUALIFIED_NAME);
+            final String typedRefQualifiedName = 
toTypedQualifiedName(typeName, refQualifiedName);
+
+            final Referenceable refFromCacheIfAvailable = 
typedQualifiedNameToRef.computeIfAbsent(typedRefQualifiedName, k -> {
+                if (id.isAssigned()) {
+                    // If this referenceable has Guid assigned, then add this 
one to cache.
+                    guidToQualifiedName.put(id._getId(), refQualifiedName);
+                    typedQualifiedNameToRef.put(typedRefQualifiedName, ref);
+                }
+                return ref;
+            });
+
+            return new Tuple<>(refQualifiedName, refFromCacheIfAvailable);
+        }).filter(Objects::nonNull).filter(tuple -> tuple.getValue() != null)
+                .collect(Collectors.toMap(Tuple::getKey, Tuple::getValue));
+    }
+
+    public void close() {
+        if (notificationInterface != null) {
+            notificationInterface.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiFlow.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiFlow.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiFlow.java
new file mode 100644
index 0000000..c3920c8
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiFlow.java
@@ -0,0 +1,540 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.controller.status.PortStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
+import org.apache.nifi.util.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.nifi.atlas.AtlasUtils.findIdByQualifiedName;
+import static org.apache.nifi.atlas.AtlasUtils.isGuidAssigned;
+import static org.apache.nifi.atlas.AtlasUtils.isUpdated;
+import static org.apache.nifi.atlas.AtlasUtils.updateMetadata;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_CLUSTER_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_DESCRIPTION;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NIFI_FLOW;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_URL;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_INPUT_PORT;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_OUTPUT_PORT;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE;
+
+public class NiFiFlow {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(NiFiFlow.class);
+
+    private final String rootProcessGroupId;
+    private String flowName;
+    private String clusterName;
+    private String url;
+    private String atlasGuid;
+    private AtlasEntity exEntity;
+    private AtlasObjectId atlasObjectId;
+    private String description;
+
+    /**
+     * Track whether this instance has metadata updated and should be updated 
in Atlas.
+     */
+    private AtomicBoolean metadataUpdated = new AtomicBoolean(false);
+    private List<String> updateAudit = new ArrayList<>();
+    private Set<String> updatedEntityGuids = new LinkedHashSet<>();
+    private Set<String> stillExistingEntityGuids = new LinkedHashSet<>();
+    private Set<String> traversedPathIds = new LinkedHashSet<>();
+    private boolean urlUpdated = false;
+
+    private final Map<String, NiFiFlowPath> flowPaths = new HashMap<>();
+    private final Map<String, ProcessorStatus> processors = new HashMap<>();
+    private final Map<String, RemoteProcessGroupStatus> remoteProcessGroups = 
new HashMap<>();
+    private final Map<String, List<ConnectionStatus>> incomingConnections = 
new HashMap<>();
+    private final Map<String, List<ConnectionStatus>> outGoingConnections = 
new HashMap<>();
+
+    private final Map<AtlasObjectId, AtlasEntity> queues = new HashMap<>();
+    // Any Ports.
+    private final Map<String, PortStatus> inputPorts = new HashMap<>();
+    private final Map<String, PortStatus> outputPorts = new HashMap<>();
+    // Root Group Ports.
+    private final Map<String, PortStatus> rootInputPorts = new HashMap<>();
+    private final Map<String, PortStatus> rootOutputPorts = new HashMap<>();
+    // Root Group Ports Entity.
+    private final Map<AtlasObjectId, AtlasEntity> rootInputPortEntities = new 
HashMap<>();
+    private final Map<AtlasObjectId, AtlasEntity> rootOutputPortEntities = new 
HashMap<>();
+
+
+    public NiFiFlow(String rootProcessGroupId) {
+        this.rootProcessGroupId = rootProcessGroupId;
+    }
+
+    public AtlasObjectId getAtlasObjectId() {
+        return atlasObjectId;
+    }
+
+    public String getRootProcessGroupId() {
+        return rootProcessGroupId;
+    }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public void setClusterName(String clusterName) {
+        updateMetadata(metadataUpdated, updateAudit, ATTR_CLUSTER_NAME, 
this.clusterName, clusterName);
+        this.clusterName = clusterName;
+        atlasObjectId = createAtlasObjectId();
+    }
+
+    private AtlasObjectId createAtlasObjectId() {
+        return new AtlasObjectId(atlasGuid, TYPE_NIFI_FLOW, 
Collections.singletonMap(ATTR_QUALIFIED_NAME, getQualifiedName()));
+    }
+
+    public AtlasEntity getExEntity() {
+        return exEntity;
+    }
+
+    public void setExEntity(AtlasEntity exEntity) {
+        this.exEntity = exEntity;
+        this.setAtlasGuid(exEntity.getGuid());
+    }
+
+    public String getAtlasGuid() {
+        return atlasGuid;
+    }
+
+    public void setAtlasGuid(String atlasGuid) {
+        this.atlasGuid = atlasGuid;
+        atlasObjectId = createAtlasObjectId();
+    }
+
+    public String getQualifiedName() {
+        return toQualifiedName(rootProcessGroupId);
+    }
+
+    public String getDescription() {
+        return description;
+    }
+
+    public void setDescription(String description) {
+        updateMetadata(metadataUpdated, updateAudit, ATTR_DESCRIPTION, 
this.description, description);
+        this.description = description;
+    }
+
+    public void addConnection(ConnectionStatus c) {
+        outGoingConnections.computeIfAbsent(c.getSourceId(), k -> new 
ArrayList<>()).add(c);
+        incomingConnections.computeIfAbsent(c.getDestinationId(), k -> new 
ArrayList<>()).add(c);
+    }
+
+    public void addProcessor(ProcessorStatus p) {
+        processors.put(p.getId(), p);
+    }
+
+    public Map<String, ProcessorStatus> getProcessors() {
+        return processors;
+    }
+
+    public void addRemoteProcessGroup(RemoteProcessGroupStatus r) {
+        remoteProcessGroups.put(r.getId(), r);
+    }
+
+    public void setFlowName(String flowName) {
+        updateMetadata(metadataUpdated, updateAudit, ATTR_NAME, this.flowName, 
flowName);
+        this.flowName = flowName;
+    }
+
+    public String getFlowName() {
+        return flowName;
+    }
+
+    public void setUrl(String url) {
+        updateMetadata(metadataUpdated, updateAudit, ATTR_URL, this.url, url);
+        if (isUpdated(this.url, url)) {
+            this.urlUpdated = true;
+        }
+        this.url = url;
+    }
+
+    public String getUrl() {
+        return url;
+    }
+
+    public List<ConnectionStatus> getIncomingConnections(String componentId) {
+        return incomingConnections.get(componentId);
+    }
+
+    public List<ConnectionStatus> getOutgoingConnections(String componentId) {
+        return outGoingConnections.get(componentId);
+    }
+
+    public void addInputPort(PortStatus port) {
+        inputPorts.put(port.getId(), port);
+    }
+
+    public Map<String, PortStatus> getInputPorts() {
+        return inputPorts;
+    }
+
+    public void addOutputPort(PortStatus port) {
+        outputPorts.put(port.getId(), port);
+    }
+
+    public Map<String, PortStatus> getOutputPorts() {
+        return outputPorts;
+    }
+
+    public void addRootInputPort(PortStatus port) {
+        rootInputPorts.put(port.getId(), port);
+        createOrUpdateRootGroupPortEntity(true, toQualifiedName(port.getId()), 
port.getName());
+    }
+
+    public Map<String, PortStatus> getRootInputPorts() {
+        return rootInputPorts;
+    }
+
+    public void addRootOutputPort(PortStatus port) {
+        rootOutputPorts.put(port.getId(), port);
+        createOrUpdateRootGroupPortEntity(false, 
toQualifiedName(port.getId()), port.getName());
+    }
+
+    public Map<String, PortStatus> getRootOutputPorts() {
+        return rootOutputPorts;
+    }
+
+    public Map<AtlasObjectId, AtlasEntity> getRootInputPortEntities() {
+        return rootInputPortEntities;
+    }
+
+    private AtlasEntity createOrUpdateRootGroupPortEntity(boolean isInput, 
String qualifiedName, String portName) {
+        final Map<AtlasObjectId, AtlasEntity> ports = isInput ? 
rootInputPortEntities : rootOutputPortEntities;
+        final Optional<AtlasObjectId> existingPortId = 
findIdByQualifiedName(ports.keySet(), qualifiedName);
+
+        final String typeName = isInput ? TYPE_NIFI_INPUT_PORT : 
TYPE_NIFI_OUTPUT_PORT;
+
+        if (existingPortId.isPresent()) {
+            final AtlasEntity entity = ports.get(existingPortId.get());
+            final String portGuid = entity.getGuid();
+            stillExistingEntityGuids.add(portGuid);
+
+            final Object currentName = entity.getAttribute(ATTR_NAME);
+            if (isUpdated(currentName, portName)) {
+                // Update port name and set updated flag.
+                entity.setAttribute(ATTR_NAME, portName);
+                updatedEntityGuids.add(portGuid);
+                updateAudit.add(String.format("Name of %s %s changed from %s 
to %s", entity.getTypeName(), portGuid, currentName, portName));
+            }
+            return entity;
+        } else {
+            final AtlasEntity entity = new AtlasEntity(typeName);
+
+            entity.setAttribute(ATTR_NIFI_FLOW, getAtlasObjectId());
+            entity.setAttribute(ATTR_NAME, portName);
+            entity.setAttribute(ATTR_QUALIFIED_NAME, qualifiedName);
+
+            final AtlasObjectId portId = new AtlasObjectId(typeName, 
ATTR_QUALIFIED_NAME, qualifiedName);
+            ports.put(portId, entity);
+            return entity;
+        }
+    }
+
+    public Map<AtlasObjectId, AtlasEntity> getRootOutputPortEntities() {
+        return rootOutputPortEntities;
+    }
+
+    public Tuple<AtlasObjectId, AtlasEntity> getOrCreateQueue(String 
destinationComponentId) {
+        final String qualifiedName = toQualifiedName(destinationComponentId);
+        final Optional<AtlasObjectId> existingQueueId = 
findIdByQualifiedName(queues.keySet(), qualifiedName);
+
+        if (existingQueueId.isPresent()) {
+            final AtlasEntity entity = queues.get(existingQueueId.get());
+            stillExistingEntityGuids.add(entity.getGuid());
+            return new Tuple<>(existingQueueId.get(), entity);
+        } else {
+            final AtlasObjectId queueId = new AtlasObjectId(TYPE_NIFI_QUEUE, 
ATTR_QUALIFIED_NAME, qualifiedName);
+            final AtlasEntity queue = new AtlasEntity(TYPE_NIFI_QUEUE);
+            queue.setAttribute(ATTR_NIFI_FLOW, getAtlasObjectId());
+            queue.setAttribute(ATTR_QUALIFIED_NAME, qualifiedName);
+            queue.setAttribute(ATTR_NAME, "queue");
+            queue.setAttribute(ATTR_DESCRIPTION, "Input queue for " + 
destinationComponentId);
+            queues.put(queueId, queue);
+            return new Tuple<>(queueId, queue);
+        }
+    }
+
+    public Map<AtlasObjectId, AtlasEntity> getQueues() {
+        return queues;
+    }
+
+    public Map<String, NiFiFlowPath> getFlowPaths() {
+        return flowPaths;
+    }
+
+    /**
+     * Find a flow_path that contains specified componentId.
+     */
+    public NiFiFlowPath findPath(String componentId) {
+        for (NiFiFlowPath path: flowPaths.values()) {
+            if (path.getProcessComponentIds().contains(componentId)){
+                return path;
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Determine if a component should be reported as NiFiFlowPath.
+     */
+    public boolean isProcessComponent(String componentId) {
+        return isProcessor(componentId) || isRootInputPort(componentId) || 
isRootOutputPort(componentId);
+    }
+
+    public boolean isProcessor(String componentId) {
+        return processors.containsKey(componentId);
+    }
+
+    public boolean isInputPort(String componentId) {
+        return inputPorts.containsKey(componentId);
+    }
+
+    public boolean isOutputPort(String componentId) {
+        return outputPorts.containsKey(componentId);
+    }
+
+    public boolean isRootInputPort(String componentId) {
+        return rootInputPorts.containsKey(componentId);
+    }
+
+    public boolean isRootOutputPort(String componentId) {
+        return rootOutputPorts.containsKey(componentId);
+    }
+
+    public String getProcessComponentName(String componentId) {
+        return getProcessComponentName(componentId, () -> "unknown");
+    }
+
+    public String getProcessComponentName(String componentId, Supplier<String> 
unknown) {
+        return isProcessor(componentId) ? 
getProcessors().get(componentId).getName()
+                : isRootInputPort(componentId) ? 
getRootInputPorts().get(componentId).getName()
+                : isRootOutputPort(componentId) ? 
getRootOutputPorts().get(componentId).getName() : unknown.get();
+    }
+
+    /**
+     * Start tracking changes from current state.
+     */
+    public void startTrackingChanges() {
+        this.metadataUpdated.set(false);
+        this.updateAudit.clear();
+        this.updatedEntityGuids.clear();
+        this.stillExistingEntityGuids.clear();
+        this.urlUpdated = false;
+    }
+
+    public boolean isMetadataUpdated() {
+        return this.metadataUpdated.get();
+    }
+
+    public String toQualifiedName(String componentId) {
+        return AtlasUtils.toQualifiedName(clusterName, componentId);
+    }
+
+    public enum EntityChangeType {
+        AS_IS,
+        CREATED,
+        UPDATED,
+        DELETED;
+
+        public static boolean containsChange(Collection<EntityChangeType> 
types) {
+            return types.contains(CREATED) || types.contains(UPDATED) || 
types.contains(DELETED);
+        }
+    }
+
+    private EntityChangeType getEntityChangeType(String guid) {
+        if (!isGuidAssigned(guid)) {
+            return EntityChangeType.CREATED;
+        } else if (updatedEntityGuids.contains(guid)) {
+            return EntityChangeType.UPDATED;
+        } else if (!stillExistingEntityGuids.contains(guid)) {
+            return EntityChangeType.DELETED;
+        }
+        return EntityChangeType.AS_IS;
+    }
+
+    public Map<EntityChangeType, List<AtlasEntity>> 
getChangedDataSetEntities() {
+        final Map<EntityChangeType, List<AtlasEntity>> changedEntities = Stream
+                .of(rootInputPortEntities.values().stream(), 
rootOutputPortEntities.values().stream(), queues.values().stream())
+                .flatMap(Function.identity())
+                .collect(Collectors.groupingBy(entity -> 
getEntityChangeType(entity.getGuid())));
+        updateAudit.add("CREATED DataSet entities=" + 
changedEntities.get(EntityChangeType.CREATED));
+        updateAudit.add("UPDATED DataSet entities=" + 
changedEntities.get(EntityChangeType.UPDATED));
+        updateAudit.add("DELETED DataSet entities=" + 
changedEntities.get(EntityChangeType.DELETED));
+        return changedEntities;
+    }
+
+    public NiFiFlowPath getOrCreateFlowPath(String pathId) {
+        traversedPathIds.add(pathId);
+        return flowPaths.computeIfAbsent(pathId, k -> new 
NiFiFlowPath(pathId));
+    }
+
+    public boolean isTraversedPath(String pathId) {
+        return traversedPathIds.contains(pathId);
+    }
+
+    private EntityChangeType getFlowPathChangeType(NiFiFlowPath path) {
+        if (path.getExEntity() == null) {
+            return EntityChangeType.CREATED;
+        } else if (path.isMetadataUpdated() || urlUpdated) {
+            return EntityChangeType.UPDATED;
+        } else if (!traversedPathIds.contains(path.getId())) {
+            return EntityChangeType.DELETED;
+        }
+        return EntityChangeType.AS_IS;
+    }
+
+    private EntityChangeType getFlowPathIOChangeType(AtlasObjectId id) {
+        final String guid = id.getGuid();
+        if (!isGuidAssigned(guid)) {
+            return EntityChangeType.CREATED;
+        } else {
+            if (TYPE_NIFI_QUEUE.equals(id.getTypeName()) && 
queues.containsKey(id)) {
+                // If an input/output is a queue, and it is owned by this 
NiFiFlow, then check if it's still needed. NiFiFlow knows active queues.
+                if (stillExistingEntityGuids.contains(guid)) {
+                    return EntityChangeType.AS_IS;
+                } else {
+                    return EntityChangeType.DELETED;
+                }
+            } else {
+                // Otherwise, do not need to delete.
+                return EntityChangeType.AS_IS;
+            }
+        }
+    }
+
+    private Tuple<EntityChangeType, AtlasEntity> 
toAtlasEntity(EntityChangeType changeType, final NiFiFlowPath path) {
+
+        final AtlasEntity entity = EntityChangeType.CREATED.equals(changeType) 
? new AtlasEntity() : new AtlasEntity(path.getExEntity());
+        entity.setTypeName(TYPE_NIFI_FLOW_PATH);
+        entity.setVersion(1L);
+        entity.setAttribute(ATTR_NIFI_FLOW, getAtlasObjectId());
+
+        final StringBuilder name = new StringBuilder();
+        final StringBuilder description = new StringBuilder();
+        path.getProcessComponentIds().forEach(pid -> {
+            final String componentName = getProcessComponentName(pid);
+
+            if (name.length() > 0) {
+                name.append(", ");
+                description.append(", ");
+            }
+            name.append(componentName);
+            description.append(String.format("%s::%s", componentName, pid));
+        });
+
+        path.setName(name.toString());
+        entity.setAttribute(ATTR_NAME, name.toString());
+        entity.setAttribute(ATTR_DESCRIPTION, description.toString());
+
+        // Use first processor's id as qualifiedName.
+        entity.setAttribute(ATTR_QUALIFIED_NAME, 
toQualifiedName(path.getId()));
+
+        entity.setAttribute(ATTR_URL, path.createDeepLinkURL(getUrl()));
+
+        final boolean inputsChanged = setChangedIOIds(path, entity, true);
+        final boolean outputsChanged = setChangedIOIds(path, entity, false);
+
+        // Even iff there's no flow path metadata changed, if any IO is 
changed then the pass should be updated.
+        EntityChangeType finalChangeType = 
EntityChangeType.AS_IS.equals(changeType)
+                ? (path.isMetadataUpdated() || inputsChanged || outputsChanged 
? EntityChangeType.UPDATED : EntityChangeType.AS_IS)
+                : changeType;
+
+        return new Tuple<>(finalChangeType, entity);
+    }
+
+    /**
+     * Set input or output DataSet ids for a NiFiFlowPath.
+     * The updated ids only containing active ids.
+     * @return True if there is any changed IO reference (create, update, 
delete).
+     */
+    private boolean setChangedIOIds(NiFiFlowPath path, AtlasEntity pathEntity, 
boolean isInput) {
+        Set<AtlasObjectId> ids = isInput ? path.getInputs() : 
path.getOutputs();
+        String targetAttribute = isInput ? ATTR_INPUTS : ATTR_OUTPUTS;
+        final Map<EntityChangeType, List<AtlasObjectId>> changedIOIds
+                = 
ids.stream().collect(Collectors.groupingBy(this::getFlowPathIOChangeType));
+        // Remove DELETED references.
+        final Set<AtlasObjectId> remainingFlowPathIOIds = 
toRemainingFlowPathIOIds(changedIOIds);
+
+        // If references are changed, update it.
+        if (path.isDataSetReferenceChanged(remainingFlowPathIOIds, isInput)) {
+            pathEntity.setAttribute(targetAttribute, remainingFlowPathIOIds);
+            return true;
+        }
+        return false;
+    }
+
+    private Set<AtlasObjectId> toRemainingFlowPathIOIds(Map<EntityChangeType, 
List<AtlasObjectId>> ids) {
+        return ids.entrySet().stream()
+                .filter(entry -> 
!EntityChangeType.DELETED.equals(entry.getKey()))
+                .flatMap(entry -> entry.getValue().stream())
+                .collect(Collectors.toSet());
+    }
+
+    public Map<EntityChangeType, List<AtlasEntity>> 
getChangedFlowPathEntities() {
+        // Convert NiFiFlowPath to AtlasEntity.
+        final HashMap<EntityChangeType, List<AtlasEntity>> changedPaths = 
flowPaths.values().stream()
+                .map(path -> {
+                    final EntityChangeType changeType = 
getFlowPathChangeType(path);
+                    switch (changeType) {
+                        case CREATED:
+                        case UPDATED:
+                        case AS_IS:
+                            return toAtlasEntity(changeType, path);
+                        default:
+                            return new Tuple<>(changeType, path.getExEntity());
+                    }
+                }).collect(Collectors.groupingBy(Tuple::getKey, HashMap::new, 
Collectors.mapping(Tuple::getValue, Collectors.toList())));
+
+        updateAudit.add("CREATED NiFiFlowPath=" + 
changedPaths.get(EntityChangeType.CREATED));
+        updateAudit.add("UPDATED NiFiFlowPath=" + 
changedPaths.get(EntityChangeType.UPDATED));
+        updateAudit.add("DELETED NiFiFlowPath=" + 
changedPaths.get(EntityChangeType.DELETED));
+        return changedPaths;
+    }
+
+    public List<String> getUpdateAudit() {
+        return updateAudit;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiFlowAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiFlowAnalyzer.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiFlowAnalyzer.java
new file mode 100644
index 0000000..777c35a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiFlowAnalyzer.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.util.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class NiFiFlowAnalyzer {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(NiFiFlowAnalyzer.class);
+
+    public void analyzeProcessGroup(NiFiFlow nifiFlow, ProcessGroupStatus 
rootProcessGroup) {
+        analyzeProcessGroup(rootProcessGroup, nifiFlow);
+        analyzeRootGroupPorts(nifiFlow, rootProcessGroup);
+    }
+
+    private void analyzeRootGroupPorts(NiFiFlow nifiFlow, ProcessGroupStatus 
rootProcessGroup) {
+        rootProcessGroup.getInputPortStatus().forEach(port -> 
nifiFlow.addRootInputPort(port));
+        rootProcessGroup.getOutputPortStatus().forEach(port -> 
nifiFlow.addRootOutputPort(port));
+    }
+
+    private void analyzeProcessGroup(final ProcessGroupStatus 
processGroupStatus, final NiFiFlow nifiFlow) {
+
+        processGroupStatus.getConnectionStatus().forEach(c -> 
nifiFlow.addConnection(c));
+        processGroupStatus.getProcessorStatus().forEach(p -> 
nifiFlow.addProcessor(p));
+        processGroupStatus.getRemoteProcessGroupStatus().forEach(r -> 
nifiFlow.addRemoteProcessGroup(r));
+        processGroupStatus.getInputPortStatus().forEach(p -> 
nifiFlow.addInputPort(p));
+        processGroupStatus.getOutputPortStatus().forEach(p -> 
nifiFlow.addOutputPort(p));
+
+        // Analyze child ProcessGroups recursively.
+        for (ProcessGroupStatus child : 
processGroupStatus.getProcessGroupStatus()) {
+            analyzeProcessGroup(child, nifiFlow);
+        }
+
+    }
+
+    private List<String> getIncomingProcessorsIds(NiFiFlow nifiFlow, 
List<ConnectionStatus> incomingConnections) {
+        if (incomingConnections == null) {
+            return Collections.emptyList();
+        }
+
+        final List<String> ids = new ArrayList<>();
+
+        incomingConnections.forEach(c -> {
+            // Ignore self relationship.
+            final String sourceId = c.getSourceId();
+            if (!sourceId.equals(c.getDestinationId())) {
+                if (nifiFlow.isProcessor(sourceId)) {
+                    ids.add(sourceId);
+                } else {
+                    ids.addAll(getIncomingProcessorsIds(nifiFlow, 
nifiFlow.getIncomingConnections(sourceId)));
+                }
+            }
+        });
+
+        return ids;
+    }
+
+    private List<String> getNextProcessComponent(NiFiFlow nifiFlow, 
NiFiFlowPath path, String componentId) {
+        final List<ConnectionStatus> outs = 
nifiFlow.getOutgoingConnections(componentId);
+        if (outs == null || outs.isEmpty()) {
+            return Collections.emptyList();
+        }
+
+        final List<String> nextProcessComponent = new ArrayList<>();
+        for (ConnectionStatus out : outs) {
+            final String destinationId = out.getDestinationId();
+            if (path.getProcessComponentIds().contains(destinationId)) {
+                // If the connection is pointing back to current path, then 
skip it to avoid loop.
+                continue;
+            }
+
+            if (nifiFlow.isProcessComponent(destinationId)) {
+                nextProcessComponent.add(destinationId);
+            } else {
+                nextProcessComponent.addAll(getNextProcessComponent(nifiFlow, 
path, destinationId));
+            }
+        }
+        return nextProcessComponent;
+    }
+
+    private void traverse(NiFiFlow nifiFlow, NiFiFlowPath path, String 
componentId) {
+
+        // If the pid is RootInputPort of the same NiFi instance, then stop 
traversing to create separate self S2S path.
+        // E.g InputPort -> MergeContent, GenerateFlowFile -> InputPort.
+        if (path.getProcessComponentIds().size() > 0 && 
nifiFlow.isRootInputPort(componentId)) {
+            return;
+        }
+
+        // Add known inputs/outputs to/from this processor, such as 
RootGroupIn/Output port
+        if (nifiFlow.isProcessComponent(componentId)) {
+            path.addProcessor(componentId);
+        }
+
+        final List<ConnectionStatus> outs = 
nifiFlow.getOutgoingConnections(componentId);
+        if (outs == null || outs.isEmpty()) {
+            return;
+        }
+
+        // Analyze destination process components.
+        final List<String> nextProcessComponents = 
getNextProcessComponent(nifiFlow, path, componentId);
+        nextProcessComponents.forEach(destPid -> {
+            if (path.getProcessComponentIds().contains(destPid)) {
+                // Avoid loop to it self.
+                return;
+            }
+
+            // If the destination has more than one inputs, or there are 
multiple destinations,
+            // then it should be treated as a separate flow path.
+            final boolean createJointPoint = nextProcessComponents.size() > 1
+                    || getIncomingProcessorsIds(nifiFlow, 
nifiFlow.getIncomingConnections(destPid)).size() > 1;
+
+            if (createJointPoint) {
+
+                final boolean alreadyTraversed = 
nifiFlow.isTraversedPath(destPid);
+
+                // Create an input queue DataSet because Atlas doesn't show 
lineage if it doesn't have in and out.
+                // This DataSet is also useful to link flowPaths together on 
Atlas lineage graph.
+                final Tuple<AtlasObjectId, AtlasEntity> queueTuple = 
nifiFlow.getOrCreateQueue(destPid);
+
+                final AtlasObjectId queueId = queueTuple.getKey();
+                path.getOutputs().add(queueId);
+
+                // If the destination is already traversed once, it doesn't 
have to be visited again.
+                if (alreadyTraversed) {
+                    return;
+                }
+
+                // Get existing or create new one.
+                final NiFiFlowPath jointPoint = 
nifiFlow.getOrCreateFlowPath(destPid);
+                jointPoint.getInputs().add(queueId);
+
+                // Start traversing as a new joint point.
+                traverse(nifiFlow, jointPoint, destPid);
+
+            } else {
+                // Normal relation, continue digging.
+                traverse(nifiFlow, path, destPid);
+            }
+
+        });
+    }
+
+    private boolean isHeadProcessor(NiFiFlow nifiFlow, List<ConnectionStatus> 
ins) {
+        if (ins == null || ins.isEmpty()) {
+            return true;
+        }
+        return ins.stream().allMatch(
+                in -> {
+                    // If it has incoming relationship from other process 
components, then return false.
+                    final String sourceId = in.getSourceId();
+                    if (nifiFlow.isProcessComponent(sourceId)) {
+                        return false;
+                    }
+                    // Check next level.
+                    final List<ConnectionStatus> incomingConnections = 
nifiFlow.getIncomingConnections(sourceId);
+                    return isHeadProcessor(nifiFlow, incomingConnections);
+                }
+        );
+    }
+
+    public void analyzePaths(NiFiFlow nifiFlow) {
+        final String rootProcessGroupId = nifiFlow.getRootProcessGroupId();
+
+        // Now let's break it into flow paths.
+        final Map<String, ProcessorStatus> processors = 
nifiFlow.getProcessors();
+        final Set<String> headProcessComponents = processors.keySet().stream()
+                .filter(pid -> {
+                    final List<ConnectionStatus> ins = 
nifiFlow.getIncomingConnections(pid);
+                    return isHeadProcessor(nifiFlow, ins);
+                })
+                .collect(Collectors.toSet());
+
+        // Use RootInputPorts as headProcessors.
+        headProcessComponents.addAll(nifiFlow.getRootInputPorts().keySet());
+
+        headProcessComponents.forEach(startPid -> {
+            // By using the startPid as its qualifiedName, it's guaranteed that
+            // the same path will end up being the same Atlas entity.
+            // However, if the first processor is replaced by another,
+            // the flow path will have a different id, and the old path is 
logically deleted.
+            final NiFiFlowPath path = nifiFlow.getOrCreateFlowPath(startPid);
+            traverse(nifiFlow, path, startPid);
+        });
+
+        nifiFlow.getFlowPaths().values().forEach(path -> {
+            if (processors.containsKey(path.getId())) {
+                final ProcessorStatus processor = processors.get(path.getId());
+                path.setGroupId(processor.getGroupId());
+            } else {
+                path.setGroupId(rootProcessGroupId);
+            }
+        });
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiFlowPath.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiFlowPath.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiFlowPath.java
new file mode 100644
index 0000000..2ecec97
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiFlowPath.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.atlas.AtlasUtils.updateMetadata;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+
+public class NiFiFlowPath implements AtlasProcess {
+    private final List<String> processComponentIds = new ArrayList<>();
+
+    private final String id;
+    private final Set<AtlasObjectId> inputs = new HashSet<>();
+    private final Set<AtlasObjectId> outputs = new HashSet<>();
+
+    private String atlasGuid;
+    private String name;
+    private String groupId;
+
+    private AtlasEntity exEntity;
+
+    private AtomicBoolean metadataUpdated = new AtomicBoolean(false);
+    private List<String> updateAudit = new ArrayList<>();
+    private Set<String> existingInputGuids;
+    private Set<String> existingOutputGuids;
+
+
+    public NiFiFlowPath(String id) {
+        this.id = id;
+    }
+    public NiFiFlowPath(String id, long lineageHash) {
+        this.id =  id + "::" + lineageHash;
+    }
+
+    public AtlasEntity getExEntity() {
+        return exEntity;
+    }
+
+    public void setExEntity(AtlasEntity exEntity) {
+        this.exEntity = exEntity;
+        this.atlasGuid = exEntity.getGuid();
+    }
+
+    public String getAtlasGuid() {
+        return atlasGuid;
+    }
+
+    public void setAtlasGuid(String atlasGuid) {
+        this.atlasGuid = atlasGuid;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        updateMetadata(metadataUpdated, updateAudit, ATTR_NAME, this.name, 
name);
+        this.name = name;
+    }
+
+    public String getGroupId() {
+        return groupId;
+    }
+
+    public void setGroupId(String groupId) {
+        updateMetadata(metadataUpdated, updateAudit, "groupId", this.groupId, 
groupId);
+        this.groupId = groupId;
+    }
+
+    public void addProcessor(String processorId) {
+        processComponentIds.add(processorId);
+    }
+
+    public Set<AtlasObjectId> getInputs() {
+        return inputs;
+    }
+
+    public Set<AtlasObjectId> getOutputs() {
+        return outputs;
+    }
+
+    public List<String> getProcessComponentIds() {
+        return processComponentIds;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public String createDeepLinkURL(String nifiUrl) {
+        // Remove lineage hash part.
+        final String componentId = id.split("::")[0];
+        return componentId.equals(groupId)
+                // This path represents the root path of a process group.
+                ? String.format("%s?processGroupId=%s", nifiUrl, groupId)
+                // This path represents a partial flow within a process group 
consists of processors.
+                : String.format("%s?processGroupId=%s&componentIds=%s", 
nifiUrl, groupId, componentId);
+    }
+
+    /**
+     * Start tracking changes from current state.
+     */
+    public void startTrackingChanges(NiFiFlow nifiFlow) {
+        this.metadataUpdated.set(false);
+        this.updateAudit.clear();
+        existingInputGuids = 
inputs.stream().map(AtlasObjectId::getGuid).collect(Collectors.toSet());
+        existingOutputGuids = 
outputs.stream().map(AtlasObjectId::getGuid).collect(Collectors.toSet());
+
+        // Remove all nifi_queues those are owned by the nifiFlow to delete 
ones no longer exist.
+        // Because it should be added again if not deleted when flow analysis 
finished.
+        final Set<AtlasObjectId> ownedQueues = nifiFlow.getQueues().keySet();
+        inputs.removeAll(ownedQueues);
+        outputs.removeAll(ownedQueues);
+    }
+
+    public boolean isMetadataUpdated() {
+        return this.metadataUpdated.get();
+    }
+
+    public List<String> getUpdateAudit() {
+        return updateAudit;
+    }
+
+    boolean isDataSetReferenceChanged(Set<AtlasObjectId> ids, boolean isInput) 
{
+        final Set<String> guids = 
ids.stream().map(AtlasObjectId::getGuid).collect(Collectors.toSet());
+        final Set<String> existingGuids = isInput ? existingInputGuids : 
existingOutputGuids;
+        return existingGuids == null || !existingGuids.equals(guids);
+    }
+
+    @Override
+    public String toString() {
+        return "NiFiFlowPath{" +
+                "name='" + name + '\'' +
+                ", inputs=" + inputs +
+                ", outputs=" + outputs +
+                ", processComponentIds=" + processComponentIds +
+                '}';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        NiFiFlowPath that = (NiFiFlowPath) o;
+
+        return id.equals(that.id);
+    }
+
+    @Override
+    public int hashCode() {
+        return id.hashCode();
+    }
+}

Reply via email to