http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java new file mode 100644 index 0000000..feb2b48 --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java @@ -0,0 +1,542 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas; + +import com.sun.jersey.api.client.UniformInterfaceException; +import com.sun.jersey.core.util.MultivaluedMapImpl; +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasClientV2; +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.model.SearchFilter; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.model.instance.EntityMutationResponse; +import org.apache.atlas.model.typedef.AtlasEntityDef; +import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; +import org.apache.atlas.model.typedef.AtlasTypesDef; +import org.apache.nifi.atlas.security.AtlasAuthN; +import org.apache.nifi.util.StringUtils; +import org.apache.nifi.util.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.core.MultivaluedMap; +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.apache.nifi.atlas.AtlasUtils.findIdByQualifiedName; +import static org.apache.nifi.atlas.AtlasUtils.getComponentIdFromQualifiedName; +import static org.apache.nifi.atlas.AtlasUtils.toStr; +import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.AS_IS; +import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.CREATED; +import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.DELETED; +import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.UPDATED; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_DESCRIPTION; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_FLOW_PATHS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUT_PORTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUT_PORTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUEUES; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_URL; +import static org.apache.nifi.atlas.NiFiTypes.ENTITIES; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_INPUT_PORT; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_OUTPUT_PORT; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE; + +public class NiFiAtlasClient { + + private static final Logger logger = LoggerFactory.getLogger(NiFiAtlasClient.class); + + private static NiFiAtlasClient nifiClient; + private AtlasClientV2 atlasClient; + + private NiFiAtlasClient() { + super(); + } + + public static NiFiAtlasClient getInstance() { + if (nifiClient == null) { + synchronized (NiFiAtlasClient.class) { + if (nifiClient == null) { + nifiClient = new NiFiAtlasClient(); + } + } + } + return nifiClient; + } + + public void initialize(final String[] baseUrls, final AtlasAuthN authN, final File atlasConfDir) { + + synchronized (NiFiAtlasClient.class) { + + if (atlasClient != null) { + logger.info("{} had been setup but replacing it with new one.", atlasClient); + ApplicationProperties.forceReload(); + } + + if (atlasConfDir != null) { + // If atlasConfDir is not set, atlas-application.properties will be searched under classpath. + Properties props = System.getProperties(); + final String atlasConfProp = "atlas.conf"; + props.setProperty(atlasConfProp, atlasConfDir.getAbsolutePath()); + logger.debug("{} has been set to: {}", atlasConfProp, props.getProperty(atlasConfProp)); + } + + atlasClient = authN.createClient(baseUrls); + + } + } + + /** + * This is an utility method to delete unused types. + * Should be used during development or testing only. + * @param typeNames to delete + */ + void deleteTypeDefs(String ... typeNames) throws AtlasServiceException { + final AtlasTypesDef existingTypeDef = getTypeDefs(typeNames); + try { + atlasClient.deleteAtlasTypeDefs(existingTypeDef); + } catch (UniformInterfaceException e) { + if (e.getResponse().getStatus() == 204) { + // 204 is a successful response. + // NOTE: However after executing this, Atlas should be restarted to work properly. + logger.info("Deleted type defs: {}", existingTypeDef); + } else { + throw e; + } + } + } + + /** + * @return True when required NiFi types are already created. + */ + public boolean isNiFiTypeDefsRegistered() throws AtlasServiceException { + final Set<String> typeNames = ENTITIES.keySet(); + final Map<String, AtlasEntityDef> existingDefs = getTypeDefs(typeNames.toArray(new String[typeNames.size()])).getEntityDefs().stream() + .collect(Collectors.toMap(AtlasEntityDef::getName, Function.identity())); + return typeNames.stream().allMatch(existingDefs::containsKey); + } + + /** + * Create or update NiFi types in Atlas type system. + * @param update If false, doesn't perform anything if there is existing type def for the name. + */ + public void registerNiFiTypeDefs(boolean update) throws AtlasServiceException { + final Set<String> typeNames = ENTITIES.keySet(); + final Map<String, AtlasEntityDef> existingDefs = getTypeDefs(typeNames.toArray(new String[typeNames.size()])).getEntityDefs().stream() + .collect(Collectors.toMap(AtlasEntityDef::getName, Function.identity())); + + + final AtomicBoolean shouldUpdate = new AtomicBoolean(false); + + final AtlasTypesDef type = new AtlasTypesDef(); + + typeNames.stream().filter(typeName -> { + final AtlasEntityDef existingDef = existingDefs.get(typeName); + if (existingDef != null) { + // type is already defined. + if (!update) { + return false; + } + shouldUpdate.set(true); + } + return true; + }).forEach(typeName -> { + final NiFiTypes.EntityDefinition def = ENTITIES.get(typeName); + + final AtlasEntityDef entity = new AtlasEntityDef(); + type.getEntityDefs().add(entity); + + entity.setName(typeName); + + Set<String> superTypes = new HashSet<>(); + List<AtlasAttributeDef> attributes = new ArrayList<>(); + + def.define(entity, superTypes, attributes); + + entity.setSuperTypes(superTypes); + entity.setAttributeDefs(attributes); + }); + + // Create or Update. + final AtlasTypesDef atlasTypeDefsResult = shouldUpdate.get() + ? atlasClient.updateAtlasTypeDefs(type) + : atlasClient.createAtlasTypeDefs(type); + logger.debug("Result={}", atlasTypeDefsResult); + } + + private AtlasTypesDef getTypeDefs(String ... typeNames) throws AtlasServiceException { + final AtlasTypesDef typeDefs = new AtlasTypesDef(); + for (int i = 0; i < typeNames.length; i++) { + final MultivaluedMap<String, String> searchParams = new MultivaluedMapImpl(); + searchParams.add(SearchFilter.PARAM_NAME, typeNames[i]); + final AtlasTypesDef typeDef = atlasClient.getAllTypeDefs(new SearchFilter(searchParams)); + typeDefs.getEntityDefs().addAll(typeDef.getEntityDefs()); + } + logger.debug("typeDefs={}", typeDefs); + return typeDefs; + } + + private Pattern FLOW_PATH_URL_PATTERN = Pattern.compile("^http.+processGroupId=([0-9a-z\\-]+).*$"); + /** + * Fetch existing NiFiFlow entity from Atlas. + * @param rootProcessGroupId The id of a NiFi flow root process group. + * @param clusterName The cluster name of a flow. + * @return A NiFiFlow instance filled with retrieved data from Atlas. Status objects are left blank, e.g. ProcessorStatus. + * @throws AtlasServiceException Thrown if requesting to Atlas API failed, including when the flow is not found. + */ + public NiFiFlow fetchNiFiFlow(String rootProcessGroupId, String clusterName) throws AtlasServiceException { + + final String qualifiedName = AtlasUtils.toQualifiedName(clusterName, rootProcessGroupId); + final AtlasObjectId flowId = new AtlasObjectId(TYPE_NIFI_FLOW, ATTR_QUALIFIED_NAME, qualifiedName); + final AtlasEntity.AtlasEntityWithExtInfo nifiFlowExt = searchEntityDef(flowId); + + if (nifiFlowExt == null || nifiFlowExt.getEntity() == null) { + return null; + } + + final AtlasEntity nifiFlowEntity = nifiFlowExt.getEntity(); + final Map<String, Object> attributes = nifiFlowEntity.getAttributes(); + final NiFiFlow nifiFlow = new NiFiFlow(rootProcessGroupId); + nifiFlow.setExEntity(nifiFlowEntity); + nifiFlow.setFlowName(toStr(attributes.get(ATTR_NAME))); + nifiFlow.setClusterName(clusterName); + nifiFlow.setUrl(toStr(attributes.get(ATTR_URL))); + nifiFlow.setDescription(toStr(attributes.get(ATTR_DESCRIPTION))); + + nifiFlow.getQueues().putAll(toQualifiedNameIds(toAtlasObjectIds(nifiFlowEntity.getAttribute(ATTR_QUEUES)))); + nifiFlow.getRootInputPortEntities().putAll(toQualifiedNameIds(toAtlasObjectIds(nifiFlowEntity.getAttribute(ATTR_INPUT_PORTS)))); + nifiFlow.getRootOutputPortEntities().putAll(toQualifiedNameIds(toAtlasObjectIds(nifiFlowEntity.getAttribute(ATTR_OUTPUT_PORTS)))); + + final Map<String, NiFiFlowPath> flowPaths = nifiFlow.getFlowPaths(); + final Map<AtlasObjectId, AtlasEntity> flowPathEntities = toQualifiedNameIds(toAtlasObjectIds(attributes.get(ATTR_FLOW_PATHS))); + + for (AtlasEntity flowPathEntity : flowPathEntities.values()) { + final String pathQualifiedName = toStr(flowPathEntity.getAttribute(ATTR_QUALIFIED_NAME)); + final NiFiFlowPath flowPath = new NiFiFlowPath(getComponentIdFromQualifiedName(pathQualifiedName)); + if (flowPathEntity.hasAttribute(ATTR_URL)) { + final Matcher urlMatcher = FLOW_PATH_URL_PATTERN.matcher(toStr(flowPathEntity.getAttribute(ATTR_URL))); + if (urlMatcher.matches()) { + flowPath.setGroupId(urlMatcher.group(1)); + } + } + flowPath.setExEntity(flowPathEntity); + flowPath.setName(toStr(flowPathEntity.getAttribute(ATTR_NAME))); + flowPath.getInputs().addAll(toQualifiedNameIds(toAtlasObjectIds(flowPathEntity.getAttribute(ATTR_INPUTS))).keySet()); + flowPath.getOutputs().addAll(toQualifiedNameIds(toAtlasObjectIds(flowPathEntity.getAttribute(ATTR_OUTPUTS))).keySet()); + flowPath.startTrackingChanges(nifiFlow); + + flowPaths.put(flowPath.getId(), flowPath); + } + + nifiFlow.startTrackingChanges(); + return nifiFlow; + } + + @SuppressWarnings("unchecked") + private List<AtlasObjectId> toAtlasObjectIds(Object _references) { + if (_references == null) { + return Collections.emptyList(); + } + List<Map<String, Object>> references = (List<Map<String, Object>>) _references; + return references.stream() + .map(ref -> new AtlasObjectId(toStr(ref.get(ATTR_GUID)), toStr(ref.get(ATTR_TYPENAME)), ref)) + .collect(Collectors.toList()); + } + + /** + * <p>AtlasObjectIds returned from Atlas have GUID, but do not have qualifiedName, while ones created by the reporting task + * do not have GUID, but qualifiedName. AtlasObjectId.equals returns false for this combination. + * In order to match ids correctly, this method converts fetches actual entities from ids to get qualifiedName attribute.</p> + * + * <p>Also, AtlasObjectIds returned from Atlas does not have entity state. + * If Atlas is configured to use soft-delete (default), deleted ids are still returned. + * Fetched entities are used to determine whether an AtlasObjectId is still active or deleted. + * Deleted entities will not be included in the result of this method. + * </p> + * @param ids to convert + * @return AtlasObjectIds with qualifiedName + */ + private Map<AtlasObjectId, AtlasEntity> toQualifiedNameIds(List<AtlasObjectId> ids) { + if (ids == null) { + return Collections.emptyMap(); + } + + return ids.stream().distinct().map(id -> { + try { + final AtlasEntity.AtlasEntityWithExtInfo entityExt = searchEntityDef(id); + final AtlasEntity entity = entityExt.getEntity(); + if (AtlasEntity.Status.DELETED.equals(entity.getStatus())) { + return null; + } + final Map<String, Object> uniqueAttrs = Collections.singletonMap(ATTR_QUALIFIED_NAME, entity.getAttribute(ATTR_QUALIFIED_NAME)); + return new Tuple<>(new AtlasObjectId(id.getGuid(), id.getTypeName(), uniqueAttrs), entity); + } catch (AtlasServiceException e) { + logger.warn("Failed to search entity by id {}, due to {}", id, e); + return null; + } + }).filter(Objects::nonNull).collect(Collectors.toMap(Tuple::getKey, Tuple::getValue)); + } + + public void registerNiFiFlow(NiFiFlow nifiFlow) throws AtlasServiceException { + + // Create parent flow entity, so that common properties are taken over. + final AtlasEntity flowEntity = registerNiFiFlowEntity(nifiFlow); + + // Create DataSet entities those are created by this NiFi flow. + final Map<String, List<AtlasEntity>> updatedDataSetEntities = registerDataSetEntities(nifiFlow); + + // Create path entities. + final Set<AtlasObjectId> remainingPathIds = registerFlowPathEntities(nifiFlow); + + // Update these attributes only if anything is created, updated or removed. + boolean shouldUpdateNiFiFlow = nifiFlow.isMetadataUpdated(); + if (remainingPathIds != null) { + flowEntity.setAttribute(ATTR_FLOW_PATHS, remainingPathIds); + shouldUpdateNiFiFlow = true; + } + if (updatedDataSetEntities.containsKey(TYPE_NIFI_QUEUE)) { + flowEntity.setAttribute(ATTR_QUEUES, updatedDataSetEntities.get(TYPE_NIFI_QUEUE)); + shouldUpdateNiFiFlow = true; + } + if (updatedDataSetEntities.containsKey(TYPE_NIFI_INPUT_PORT)) { + flowEntity.setAttribute(ATTR_INPUT_PORTS, updatedDataSetEntities.get(TYPE_NIFI_INPUT_PORT)); + shouldUpdateNiFiFlow = true; + } + if (updatedDataSetEntities.containsKey(TYPE_NIFI_OUTPUT_PORT)) { + flowEntity.setAttribute(ATTR_OUTPUT_PORTS, updatedDataSetEntities.get(TYPE_NIFI_OUTPUT_PORT)); + shouldUpdateNiFiFlow = true; + } + + if (logger.isDebugEnabled()) { + logger.debug("### NiFi Flow Audit Logs START"); + nifiFlow.getUpdateAudit().forEach(logger::debug); + nifiFlow.getFlowPaths().forEach((k, v) -> { + logger.debug("--- NiFiFlowPath Audit Logs: {}", k); + v.getUpdateAudit().forEach(logger::debug); + }); + logger.debug("### NiFi Flow Audit Logs END"); + } + + if (shouldUpdateNiFiFlow) { + // Send updated entities. + final List<AtlasEntity> entities = new ArrayList<>(); + final AtlasEntity.AtlasEntitiesWithExtInfo atlasEntities = new AtlasEntity.AtlasEntitiesWithExtInfo(entities); + entities.add(flowEntity); + try { + final EntityMutationResponse mutationResponse = atlasClient.createEntities(atlasEntities); + logger.debug("mutation response={}", mutationResponse); + } catch (AtlasServiceException e) { + if (e.getStatus().getStatusCode() == AtlasErrorCode.INSTANCE_NOT_FOUND.getHttpCode().getStatusCode() + && e.getMessage().contains(AtlasErrorCode.INSTANCE_NOT_FOUND.getErrorCode())) { + // NOTE: If previously existed nifi_flow_path entity is removed because the path is removed from NiFi, + // then Atlas respond with 404 even though the entity is successfully updated. + // Following exception is thrown in this case. Just log it. + // org.apache.atlas.AtlasServiceException: + // Metadata service API org.apache.atlas.AtlasBaseClient$APIInfo@45a37759 + // failed with status 404 (Not Found) Response Body + // ({"errorCode":"ATLAS-404-00-00B","errorMessage":"Given instance is invalid/not found: + // Could not find entities in the repository with guids: [96d24487-cd66-4795-b552-f00b426fed26]"}) + logger.debug("Received error response from Atlas but it should be stored." + e); + } else { + throw e; + } + } + } + } + + private AtlasEntity registerNiFiFlowEntity(final NiFiFlow nifiFlow) throws AtlasServiceException { + final List<AtlasEntity> entities = new ArrayList<>(); + final AtlasEntity.AtlasEntitiesWithExtInfo atlasEntities = new AtlasEntity.AtlasEntitiesWithExtInfo(entities); + + if (!nifiFlow.isMetadataUpdated()) { + // Nothing has been changed, return existing entity. + return nifiFlow.getExEntity(); + } + + // Create parent flow entity using existing NiFiFlow entity if available, so that common properties are taken over. + final AtlasEntity flowEntity = nifiFlow.getExEntity() != null ? new AtlasEntity(nifiFlow.getExEntity()) : new AtlasEntity(); + flowEntity.setTypeName(TYPE_NIFI_FLOW); + flowEntity.setVersion(1L); + flowEntity.setAttribute(ATTR_NAME, nifiFlow.getFlowName()); + flowEntity.setAttribute(ATTR_QUALIFIED_NAME, nifiFlow.toQualifiedName(nifiFlow.getRootProcessGroupId())); + flowEntity.setAttribute(ATTR_URL, nifiFlow.getUrl()); + flowEntity.setAttribute(ATTR_DESCRIPTION, nifiFlow.getDescription()); + + // If flowEntity is not persisted yet, then store nifi_flow entity to make nifiFlowId available for other entities. + if (flowEntity.getGuid().startsWith("-")) { + entities.add(flowEntity); + final EntityMutationResponse mutationResponse = atlasClient.createEntities(atlasEntities); + logger.debug("Registered a new nifi_flow entity, mutation response={}", mutationResponse); + final String assignedNiFiFlowGuid = mutationResponse.getGuidAssignments().get(flowEntity.getGuid()); + flowEntity.setGuid(assignedNiFiFlowGuid); + nifiFlow.setAtlasGuid(assignedNiFiFlowGuid); + } + + return flowEntity; + } + + /** + * Register DataSet within specified NiFiFlow. + * @return Set of registered Atlas type names and its remaining entities without deleted ones. + */ + private Map<String, List<AtlasEntity>> registerDataSetEntities(final NiFiFlow nifiFlow) throws AtlasServiceException { + + final Map<NiFiFlow.EntityChangeType, List<AtlasEntity>> changedEntities = nifiFlow.getChangedDataSetEntities(); + + if (changedEntities.containsKey(CREATED)) { + final List<AtlasEntity> createdEntities = changedEntities.get(CREATED); + final AtlasEntity.AtlasEntitiesWithExtInfo atlasEntities = new AtlasEntity.AtlasEntitiesWithExtInfo(createdEntities); + final EntityMutationResponse mutationResponse = atlasClient.createEntities(atlasEntities); + logger.debug("Created DataSet entities mutation response={}", mutationResponse); + + final Map<String, String> guidAssignments = mutationResponse.getGuidAssignments(); + for (AtlasEntity entity : createdEntities) { + + final String guid = guidAssignments.get(entity.getGuid()); + final String qualifiedName = toStr(entity.getAttribute(ATTR_QUALIFIED_NAME)); + if (StringUtils.isEmpty(guid)) { + logger.warn("GUID was not assigned for {}::{} for some reason.", entity.getTypeName(), qualifiedName); + continue; + } + + final Map<AtlasObjectId, AtlasEntity> entityMap; + switch (entity.getTypeName()) { + case TYPE_NIFI_INPUT_PORT: + entityMap = nifiFlow.getRootInputPortEntities(); + break; + case TYPE_NIFI_OUTPUT_PORT: + entityMap = nifiFlow.getRootOutputPortEntities(); + break; + case TYPE_NIFI_QUEUE: + entityMap = nifiFlow.getQueues(); + break; + default: + throw new RuntimeException(entity.getTypeName() + " is not expected."); + } + + + // In order to replace the id, remove current id which does not have GUID. + findIdByQualifiedName(entityMap.keySet(), qualifiedName).ifPresent(entityMap::remove); + entity.setGuid(guid); + final AtlasObjectId idWithGuid = new AtlasObjectId(guid, entity.getTypeName(), Collections.singletonMap(ATTR_QUALIFIED_NAME, qualifiedName)); + entityMap.put(idWithGuid, entity); + } + } + + if (changedEntities.containsKey(UPDATED)) { + final List<AtlasEntity> updatedEntities = changedEntities.get(UPDATED); + final AtlasEntity.AtlasEntitiesWithExtInfo atlasEntities = new AtlasEntity.AtlasEntitiesWithExtInfo(updatedEntities); + final EntityMutationResponse mutationResponse = atlasClient.updateEntities(atlasEntities); + logger.debug("Updated DataSet entities mutation response={}", mutationResponse); + } + + final Set<String> changedTypeNames = changedEntities.entrySet().stream() + .filter(entry -> !AS_IS.equals(entry.getKey())).flatMap(entry -> entry.getValue().stream()) + .map(AtlasEntity::getTypeName) + .collect(Collectors.toSet()); + + // NOTE: Cascading DELETE will be performed when parent NiFiFlow is updated without removed DataSet entities. + final Map<String, List<AtlasEntity>> remainingEntitiesByType = changedEntities.entrySet().stream() + .filter(entry -> !DELETED.equals(entry.getKey())) + .flatMap(entry -> entry.getValue().stream()) + .filter(entity -> changedTypeNames.contains(entity.getTypeName())) + .collect(Collectors.groupingBy(AtlasEntity::getTypeName)); + + // If all entities are deleted for a type (e.g. nifi_intput_port), then remainingEntitiesByType will not contain such key. + // If the returning map does not contain anything for a type, then the corresponding attribute will not be updated. + // To empty an attribute when all of its elements are deleted, add empty list for a type. + changedTypeNames.forEach(changedTypeName -> remainingEntitiesByType.computeIfAbsent(changedTypeName, k -> Collections.emptyList())); + return remainingEntitiesByType; + } + + private Set<AtlasObjectId> registerFlowPathEntities(final NiFiFlow nifiFlow) throws AtlasServiceException { + + final Map<NiFiFlow.EntityChangeType, List<AtlasEntity>> changedEntities = nifiFlow.getChangedFlowPathEntities(); + + if (changedEntities.containsKey(CREATED)) { + final List<AtlasEntity> createdEntities = changedEntities.get(CREATED); + final AtlasEntity.AtlasEntitiesWithExtInfo atlasEntities = new AtlasEntity.AtlasEntitiesWithExtInfo(createdEntities); + final EntityMutationResponse mutationResponse = atlasClient.createEntities(atlasEntities); + logger.debug("Created FlowPath entities mutation response={}", mutationResponse); + + final Map<String, String> guidAssignments = mutationResponse.getGuidAssignments(); + createdEntities.forEach(entity -> { + final String guid = entity.getGuid(); + entity.setGuid(guidAssignments.get(guid)); + final String pathId = getComponentIdFromQualifiedName(toStr(entity.getAttribute(ATTR_QUALIFIED_NAME))); + final NiFiFlowPath path = nifiFlow.getFlowPaths().get(pathId); + path.setExEntity(entity); + }); + } + + if (changedEntities.containsKey(UPDATED)) { + final List<AtlasEntity> updatedEntities = changedEntities.get(UPDATED); + final AtlasEntity.AtlasEntitiesWithExtInfo atlasEntities = new AtlasEntity.AtlasEntitiesWithExtInfo(updatedEntities); + final EntityMutationResponse mutationResponse = atlasClient.updateEntities(atlasEntities); + logger.debug("Updated FlowPath entities mutation response={}", mutationResponse); + updatedEntities.forEach(entity -> { + final String pathId = getComponentIdFromQualifiedName(toStr(entity.getAttribute(ATTR_QUALIFIED_NAME))); + final NiFiFlowPath path = nifiFlow.getFlowPaths().get(pathId); + path.setExEntity(entity); + }); + } + + if (NiFiFlow.EntityChangeType.containsChange(changedEntities.keySet())) { + return changedEntities.entrySet().stream() + .filter(entry -> !DELETED.equals(entry.getKey())).flatMap(entry -> entry.getValue().stream()) + .map(path -> new AtlasObjectId(path.getGuid(), TYPE_NIFI_FLOW_PATH, + Collections.singletonMap(ATTR_QUALIFIED_NAME, path.getAttribute(ATTR_QUALIFIED_NAME)))) + .collect(Collectors.toSet()); + } + + return null; + } + + public AtlasEntity.AtlasEntityWithExtInfo searchEntityDef(AtlasObjectId id) throws AtlasServiceException { + final String guid = id.getGuid(); + if (!StringUtils.isEmpty(guid)) { + return atlasClient.getEntityByGuid(guid); + } + final Map<String, String> attributes = new HashMap<>(); + id.getUniqueAttributes().entrySet().stream().filter(entry -> entry.getValue() != null) + .forEach(entry -> attributes.put(entry.getKey(), entry.getValue().toString())); + return atlasClient.getEntityByAttribute(id.getTypeName(), attributes); + } + +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java new file mode 100644 index 0000000..43fefff --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas; + +import com.sun.jersey.api.client.ClientResponse; +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.hook.AtlasHook; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.notification.hook.HookNotification.EntityPartialUpdateRequest; +import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage; +import org.apache.atlas.typesystem.Referenceable; +import org.apache.atlas.typesystem.persistence.Id; +import org.apache.nifi.atlas.provenance.lineage.LineageContext; +import org.apache.nifi.util.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +import static org.apache.atlas.notification.hook.HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE; +import static org.apache.nifi.atlas.AtlasUtils.toTypedQualifiedName; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH; + +/** + * This class is not thread-safe as it holds uncommitted notification messages within instance. + * {@link #addMessage(HookNotificationMessage)} and {@link #commitMessages()} should be used serially from a single thread. + */ +public class NiFiAtlasHook extends AtlasHook implements LineageContext { + + public static final String NIFI_USER = "nifi"; + + private static final Logger logger = LoggerFactory.getLogger(NiFiAtlasHook.class); + private static final String CONF_PREFIX = "atlas.hook.nifi."; + private static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries"; + + private final NiFiAtlasClient atlasClient; + + /** + * An index to resolve a qualifiedName from a GUID. + */ + private final Map<String, String> guidToQualifiedName; + /** + * An index to resolve a Referenceable from a typeName::qualifiedName. + */ + private final Map<String, Referenceable> typedQualifiedNameToRef; + + + private static <K, V> Map<K, V> createCache(final int maxSize) { + return new LinkedHashMap<K, V>(maxSize, 0.75f, true) { + @Override + protected boolean removeEldestEntry(Map.Entry<K, V> eldest) { + return size() > maxSize; + } + }; + } + + public NiFiAtlasHook(NiFiAtlasClient atlasClient) { + this.atlasClient = atlasClient; + + final int qualifiedNameCacheSize = 10_000; + this.guidToQualifiedName = createCache(qualifiedNameCacheSize); + + final int dataSetRefCacheSize = 1_000; + this.typedQualifiedNameToRef = createCache(dataSetRefCacheSize); + } + + @Override + protected String getNumberOfRetriesPropertyKey() { + return HOOK_NUM_RETRIES; + } + + + private final List<HookNotificationMessage> messages = new ArrayList<>(); + + @Override + public void addMessage(HookNotificationMessage message) { + messages.add(message); + } + + private class Metrics { + final long startedAt = System.currentTimeMillis(); + int totalMessages; + int partialNiFiFlowPathUpdates; + int dedupedPartialNiFiFlowPathUpdates; + int otherMessages; + int flowPathSearched; + int dataSetSearched; + int dataSetCacheHit; + private void log(String message) { + logger.debug(String.format("%s, %d ms passed, totalMessages=%d," + + " partialNiFiFlowPathUpdates=%d, dedupedPartialNiFiFlowPathUpdates=%d, otherMessage=%d," + + " flowPathSearched=%d, dataSetSearched=%d, dataSetCacheHit=%s," + + " guidToQualifiedName.size=%d, typedQualifiedNameToRef.size=%d", + message, System.currentTimeMillis() - startedAt, totalMessages, + partialNiFiFlowPathUpdates, dedupedPartialNiFiFlowPathUpdates, otherMessages, + flowPathSearched, dataSetSearched, dataSetCacheHit, + guidToQualifiedName.size(), typedQualifiedNameToRef.size())); + } + } + + public void commitMessages() { + final Map<Boolean, List<HookNotificationMessage>> partialNiFiFlowPathUpdateAndOthers + = messages.stream().collect(Collectors.groupingBy(msg + -> ENTITY_PARTIAL_UPDATE.equals(msg.getType()) + && TYPE_NIFI_FLOW_PATH.equals(((EntityPartialUpdateRequest)msg).getTypeName()) + && ATTR_QUALIFIED_NAME.equals(((EntityPartialUpdateRequest)msg).getAttribute()) + )); + + + final List<HookNotificationMessage> otherMessages = partialNiFiFlowPathUpdateAndOthers.computeIfAbsent(false, k -> Collections.emptyList()); + final List<HookNotificationMessage> partialNiFiFlowPathUpdates = partialNiFiFlowPathUpdateAndOthers.computeIfAbsent(true, k -> Collections.emptyList()); + logger.info("Commit messages: {} partialNiFiFlowPathUpdate and {} other messages.", partialNiFiFlowPathUpdates.size(), otherMessages.size()); + + final Metrics metrics = new Metrics(); + metrics.totalMessages = messages.size(); + metrics.partialNiFiFlowPathUpdates = partialNiFiFlowPathUpdates.size(); + metrics.otherMessages = otherMessages.size(); + + try { + // Notify other messages first. + notifyEntities(otherMessages); + + // De-duplicate messages. + final List<HookNotificationMessage> deduplicatedMessages = partialNiFiFlowPathUpdates.stream().map(msg -> (EntityPartialUpdateRequest) msg) + // Group by nifi_flow_path qualifiedName value. + .collect(Collectors.groupingBy(EntityPartialUpdateRequest::getAttributeValue)).entrySet().stream() + .map(entry -> { + final String flowPathQualifiedName = entry.getKey(); + final Map<String, Referenceable> distinctInputs; + final Map<String, Referenceable> distinctOutputs; + final String flowPathGuid; + try { + // Fetch existing nifi_flow_path and its inputs/ouputs. + metrics.flowPathSearched++; + final AtlasEntity.AtlasEntityWithExtInfo flowPathExt + = atlasClient.searchEntityDef(new AtlasObjectId(TYPE_NIFI_FLOW_PATH, ATTR_QUALIFIED_NAME, flowPathQualifiedName)); + final AtlasEntity flowPathEntity = flowPathExt.getEntity(); + flowPathGuid = flowPathEntity.getGuid(); + distinctInputs = toReferenceables(flowPathEntity.getAttribute(ATTR_INPUTS), metrics); + distinctOutputs = toReferenceables(flowPathEntity.getAttribute(ATTR_OUTPUTS), metrics); + + } catch (AtlasServiceException e) { + if (ClientResponse.Status.NOT_FOUND.equals(e.getStatus())) { + logger.debug("nifi_flow_path was not found for qualifiedName {}", flowPathQualifiedName); + } else { + logger.warn("Failed to retrieve nifi_flow_path with qualifiedName {} due to {}", flowPathQualifiedName, e, e); + } + return null; + } + + // Merge all inputs and outputs for this nifi_flow_path. + for (EntityPartialUpdateRequest msg : entry.getValue()) { + fromReferenceable(msg.getEntity().get(ATTR_INPUTS), metrics) + .entrySet().stream().filter(ref -> !distinctInputs.containsKey(ref.getKey())) + .forEach(ref -> distinctInputs.put(ref.getKey(), ref.getValue())); + + fromReferenceable(msg.getEntity().get(ATTR_OUTPUTS), metrics) + .entrySet().stream().filter(ref -> !distinctOutputs.containsKey(ref.getKey())) + .forEach(ref -> distinctOutputs.put(ref.getKey(), ref.getValue())); + } + + // Consolidate messages into one. + final Referenceable flowPathRef = new Referenceable(flowPathGuid, TYPE_NIFI_FLOW_PATH, null); + // NOTE: distinctInputs.values() returns HashMap$Values, which causes following error. To avoid that, wrap with ArrayList: + // org.json4s.package$MappingException: Can't find ScalaSig for class org.apache.atlas.typesystem.Referenceable + flowPathRef.set(ATTR_INPUTS, new ArrayList<>(distinctInputs.values())); + flowPathRef.set(ATTR_OUTPUTS, new ArrayList<>(distinctOutputs.values())); + return new EntityPartialUpdateRequest(NIFI_USER, TYPE_NIFI_FLOW_PATH, + ATTR_QUALIFIED_NAME, flowPathQualifiedName, flowPathRef); + }) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + + metrics.dedupedPartialNiFiFlowPathUpdates = deduplicatedMessages.size(); + notifyEntities(deduplicatedMessages); + } finally { + metrics.log("Committed"); + messages.clear(); + } + } + + /** + * <p>Convert nifi_flow_path inputs or outputs to a map of Referenceable keyed by qualifiedName.</p> + * <p>Atlas removes existing references those are not specified when a collection attribute is updated. + * In order to preserve existing DataSet references, existing elements should be passed within a partial update message.</p> + * <p>This method also populates entity cache for subsequent lookups.</p> + * @param _refs Contains references from an existin nifi_flow_path entity inputs or outputs attribute. + * @return A map of Referenceables keyed by qualifiedName. + */ + @SuppressWarnings("unchecked") + private Map<String, Referenceable> toReferenceables(Object _refs, Metrics metrics) { + if (_refs == null) { + // NOTE: This empty map may be used to add new Referenceables. Can not be Collection.emptyMap which does not support addition. + return new HashMap<>(); + } + + final List<Map<String, Object>> refs = (List<Map<String, Object>>) _refs; + return refs.stream().map(ref -> { + // Existing reference should has a GUID. + final String typeName = (String) ref.get(ATTR_TYPENAME); + final String guid = (String) ref.get(ATTR_GUID); + + if (guidToQualifiedName.containsKey(guid)) { + metrics.dataSetCacheHit++; + } + + final String refQualifiedName = guidToQualifiedName.computeIfAbsent(guid, k -> { + try { + metrics.dataSetSearched++; + final AtlasEntity.AtlasEntityWithExtInfo refExt = atlasClient.searchEntityDef(new AtlasObjectId(guid, typeName)); + final String qualifiedName = (String) refExt.getEntity().getAttribute(ATTR_QUALIFIED_NAME); + typedQualifiedNameToRef.put(toTypedQualifiedName(typeName, qualifiedName), new Referenceable(guid, typeName, Collections.EMPTY_MAP)); + return qualifiedName; + } catch (AtlasServiceException e) { + if (ClientResponse.Status.NOT_FOUND.equals(e.getStatus())) { + logger.warn("{} entity was not found for guid {}", typeName, guid); + } else { + logger.warn("Failed to retrieve {} with guid {} due to {}", typeName, guid, e); + } + return null; + } + }); + + if (refQualifiedName == null) { + return null; + } + return new Tuple<>(refQualifiedName, typedQualifiedNameToRef.get(toTypedQualifiedName(typeName, refQualifiedName))); + }).filter(Objects::nonNull).filter(tuple -> tuple.getValue() != null) + .collect(Collectors.toMap(Tuple::getKey, Tuple::getValue)); + } + + @SuppressWarnings("unchecked") + private Map<String, Referenceable> fromReferenceable(Object _refs, Metrics metrics) { + if (_refs == null) { + return Collections.emptyMap(); + } + + final List<Referenceable> refs = (List<Referenceable>) _refs; + return refs.stream().map(ref -> { + // This ref is created within this reporting cycle, and it may not have GUID assigned yet, if it is a brand new reference. + // If cache has the Reference, then use it because instances in the cache are guaranteed to have GUID assigned. + // Brand new Referenceables have to have all mandatory attributes. + final String typeName = ref.getTypeName(); + final Id id = ref.getId(); + final String refQualifiedName = (String) ref.get(ATTR_QUALIFIED_NAME); + final String typedRefQualifiedName = toTypedQualifiedName(typeName, refQualifiedName); + + final Referenceable refFromCacheIfAvailable = typedQualifiedNameToRef.computeIfAbsent(typedRefQualifiedName, k -> { + if (id.isAssigned()) { + // If this referenceable has Guid assigned, then add this one to cache. + guidToQualifiedName.put(id._getId(), refQualifiedName); + typedQualifiedNameToRef.put(typedRefQualifiedName, ref); + } + return ref; + }); + + return new Tuple<>(refQualifiedName, refFromCacheIfAvailable); + }).filter(Objects::nonNull).filter(tuple -> tuple.getValue() != null) + .collect(Collectors.toMap(Tuple::getKey, Tuple::getValue)); + } + + public void close() { + if (notificationInterface != null) { + notificationInterface.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiFlow.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiFlow.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiFlow.java new file mode 100644 index 0000000..c3920c8 --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiFlow.java @@ -0,0 +1,540 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas; + +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.nifi.controller.status.ConnectionStatus; +import org.apache.nifi.controller.status.PortStatus; +import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.controller.status.RemoteProcessGroupStatus; +import org.apache.nifi.util.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.nifi.atlas.AtlasUtils.findIdByQualifiedName; +import static org.apache.nifi.atlas.AtlasUtils.isGuidAssigned; +import static org.apache.nifi.atlas.AtlasUtils.isUpdated; +import static org.apache.nifi.atlas.AtlasUtils.updateMetadata; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_CLUSTER_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_DESCRIPTION; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_NIFI_FLOW; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_URL; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_INPUT_PORT; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_OUTPUT_PORT; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE; + +public class NiFiFlow { + + private static final Logger logger = LoggerFactory.getLogger(NiFiFlow.class); + + private final String rootProcessGroupId; + private String flowName; + private String clusterName; + private String url; + private String atlasGuid; + private AtlasEntity exEntity; + private AtlasObjectId atlasObjectId; + private String description; + + /** + * Track whether this instance has metadata updated and should be updated in Atlas. + */ + private AtomicBoolean metadataUpdated = new AtomicBoolean(false); + private List<String> updateAudit = new ArrayList<>(); + private Set<String> updatedEntityGuids = new LinkedHashSet<>(); + private Set<String> stillExistingEntityGuids = new LinkedHashSet<>(); + private Set<String> traversedPathIds = new LinkedHashSet<>(); + private boolean urlUpdated = false; + + private final Map<String, NiFiFlowPath> flowPaths = new HashMap<>(); + private final Map<String, ProcessorStatus> processors = new HashMap<>(); + private final Map<String, RemoteProcessGroupStatus> remoteProcessGroups = new HashMap<>(); + private final Map<String, List<ConnectionStatus>> incomingConnections = new HashMap<>(); + private final Map<String, List<ConnectionStatus>> outGoingConnections = new HashMap<>(); + + private final Map<AtlasObjectId, AtlasEntity> queues = new HashMap<>(); + // Any Ports. + private final Map<String, PortStatus> inputPorts = new HashMap<>(); + private final Map<String, PortStatus> outputPorts = new HashMap<>(); + // Root Group Ports. + private final Map<String, PortStatus> rootInputPorts = new HashMap<>(); + private final Map<String, PortStatus> rootOutputPorts = new HashMap<>(); + // Root Group Ports Entity. + private final Map<AtlasObjectId, AtlasEntity> rootInputPortEntities = new HashMap<>(); + private final Map<AtlasObjectId, AtlasEntity> rootOutputPortEntities = new HashMap<>(); + + + public NiFiFlow(String rootProcessGroupId) { + this.rootProcessGroupId = rootProcessGroupId; + } + + public AtlasObjectId getAtlasObjectId() { + return atlasObjectId; + } + + public String getRootProcessGroupId() { + return rootProcessGroupId; + } + + public String getClusterName() { + return clusterName; + } + + public void setClusterName(String clusterName) { + updateMetadata(metadataUpdated, updateAudit, ATTR_CLUSTER_NAME, this.clusterName, clusterName); + this.clusterName = clusterName; + atlasObjectId = createAtlasObjectId(); + } + + private AtlasObjectId createAtlasObjectId() { + return new AtlasObjectId(atlasGuid, TYPE_NIFI_FLOW, Collections.singletonMap(ATTR_QUALIFIED_NAME, getQualifiedName())); + } + + public AtlasEntity getExEntity() { + return exEntity; + } + + public void setExEntity(AtlasEntity exEntity) { + this.exEntity = exEntity; + this.setAtlasGuid(exEntity.getGuid()); + } + + public String getAtlasGuid() { + return atlasGuid; + } + + public void setAtlasGuid(String atlasGuid) { + this.atlasGuid = atlasGuid; + atlasObjectId = createAtlasObjectId(); + } + + public String getQualifiedName() { + return toQualifiedName(rootProcessGroupId); + } + + public String getDescription() { + return description; + } + + public void setDescription(String description) { + updateMetadata(metadataUpdated, updateAudit, ATTR_DESCRIPTION, this.description, description); + this.description = description; + } + + public void addConnection(ConnectionStatus c) { + outGoingConnections.computeIfAbsent(c.getSourceId(), k -> new ArrayList<>()).add(c); + incomingConnections.computeIfAbsent(c.getDestinationId(), k -> new ArrayList<>()).add(c); + } + + public void addProcessor(ProcessorStatus p) { + processors.put(p.getId(), p); + } + + public Map<String, ProcessorStatus> getProcessors() { + return processors; + } + + public void addRemoteProcessGroup(RemoteProcessGroupStatus r) { + remoteProcessGroups.put(r.getId(), r); + } + + public void setFlowName(String flowName) { + updateMetadata(metadataUpdated, updateAudit, ATTR_NAME, this.flowName, flowName); + this.flowName = flowName; + } + + public String getFlowName() { + return flowName; + } + + public void setUrl(String url) { + updateMetadata(metadataUpdated, updateAudit, ATTR_URL, this.url, url); + if (isUpdated(this.url, url)) { + this.urlUpdated = true; + } + this.url = url; + } + + public String getUrl() { + return url; + } + + public List<ConnectionStatus> getIncomingConnections(String componentId) { + return incomingConnections.get(componentId); + } + + public List<ConnectionStatus> getOutgoingConnections(String componentId) { + return outGoingConnections.get(componentId); + } + + public void addInputPort(PortStatus port) { + inputPorts.put(port.getId(), port); + } + + public Map<String, PortStatus> getInputPorts() { + return inputPorts; + } + + public void addOutputPort(PortStatus port) { + outputPorts.put(port.getId(), port); + } + + public Map<String, PortStatus> getOutputPorts() { + return outputPorts; + } + + public void addRootInputPort(PortStatus port) { + rootInputPorts.put(port.getId(), port); + createOrUpdateRootGroupPortEntity(true, toQualifiedName(port.getId()), port.getName()); + } + + public Map<String, PortStatus> getRootInputPorts() { + return rootInputPorts; + } + + public void addRootOutputPort(PortStatus port) { + rootOutputPorts.put(port.getId(), port); + createOrUpdateRootGroupPortEntity(false, toQualifiedName(port.getId()), port.getName()); + } + + public Map<String, PortStatus> getRootOutputPorts() { + return rootOutputPorts; + } + + public Map<AtlasObjectId, AtlasEntity> getRootInputPortEntities() { + return rootInputPortEntities; + } + + private AtlasEntity createOrUpdateRootGroupPortEntity(boolean isInput, String qualifiedName, String portName) { + final Map<AtlasObjectId, AtlasEntity> ports = isInput ? rootInputPortEntities : rootOutputPortEntities; + final Optional<AtlasObjectId> existingPortId = findIdByQualifiedName(ports.keySet(), qualifiedName); + + final String typeName = isInput ? TYPE_NIFI_INPUT_PORT : TYPE_NIFI_OUTPUT_PORT; + + if (existingPortId.isPresent()) { + final AtlasEntity entity = ports.get(existingPortId.get()); + final String portGuid = entity.getGuid(); + stillExistingEntityGuids.add(portGuid); + + final Object currentName = entity.getAttribute(ATTR_NAME); + if (isUpdated(currentName, portName)) { + // Update port name and set updated flag. + entity.setAttribute(ATTR_NAME, portName); + updatedEntityGuids.add(portGuid); + updateAudit.add(String.format("Name of %s %s changed from %s to %s", entity.getTypeName(), portGuid, currentName, portName)); + } + return entity; + } else { + final AtlasEntity entity = new AtlasEntity(typeName); + + entity.setAttribute(ATTR_NIFI_FLOW, getAtlasObjectId()); + entity.setAttribute(ATTR_NAME, portName); + entity.setAttribute(ATTR_QUALIFIED_NAME, qualifiedName); + + final AtlasObjectId portId = new AtlasObjectId(typeName, ATTR_QUALIFIED_NAME, qualifiedName); + ports.put(portId, entity); + return entity; + } + } + + public Map<AtlasObjectId, AtlasEntity> getRootOutputPortEntities() { + return rootOutputPortEntities; + } + + public Tuple<AtlasObjectId, AtlasEntity> getOrCreateQueue(String destinationComponentId) { + final String qualifiedName = toQualifiedName(destinationComponentId); + final Optional<AtlasObjectId> existingQueueId = findIdByQualifiedName(queues.keySet(), qualifiedName); + + if (existingQueueId.isPresent()) { + final AtlasEntity entity = queues.get(existingQueueId.get()); + stillExistingEntityGuids.add(entity.getGuid()); + return new Tuple<>(existingQueueId.get(), entity); + } else { + final AtlasObjectId queueId = new AtlasObjectId(TYPE_NIFI_QUEUE, ATTR_QUALIFIED_NAME, qualifiedName); + final AtlasEntity queue = new AtlasEntity(TYPE_NIFI_QUEUE); + queue.setAttribute(ATTR_NIFI_FLOW, getAtlasObjectId()); + queue.setAttribute(ATTR_QUALIFIED_NAME, qualifiedName); + queue.setAttribute(ATTR_NAME, "queue"); + queue.setAttribute(ATTR_DESCRIPTION, "Input queue for " + destinationComponentId); + queues.put(queueId, queue); + return new Tuple<>(queueId, queue); + } + } + + public Map<AtlasObjectId, AtlasEntity> getQueues() { + return queues; + } + + public Map<String, NiFiFlowPath> getFlowPaths() { + return flowPaths; + } + + /** + * Find a flow_path that contains specified componentId. + */ + public NiFiFlowPath findPath(String componentId) { + for (NiFiFlowPath path: flowPaths.values()) { + if (path.getProcessComponentIds().contains(componentId)){ + return path; + } + } + return null; + } + + /** + * Determine if a component should be reported as NiFiFlowPath. + */ + public boolean isProcessComponent(String componentId) { + return isProcessor(componentId) || isRootInputPort(componentId) || isRootOutputPort(componentId); + } + + public boolean isProcessor(String componentId) { + return processors.containsKey(componentId); + } + + public boolean isInputPort(String componentId) { + return inputPorts.containsKey(componentId); + } + + public boolean isOutputPort(String componentId) { + return outputPorts.containsKey(componentId); + } + + public boolean isRootInputPort(String componentId) { + return rootInputPorts.containsKey(componentId); + } + + public boolean isRootOutputPort(String componentId) { + return rootOutputPorts.containsKey(componentId); + } + + public String getProcessComponentName(String componentId) { + return getProcessComponentName(componentId, () -> "unknown"); + } + + public String getProcessComponentName(String componentId, Supplier<String> unknown) { + return isProcessor(componentId) ? getProcessors().get(componentId).getName() + : isRootInputPort(componentId) ? getRootInputPorts().get(componentId).getName() + : isRootOutputPort(componentId) ? getRootOutputPorts().get(componentId).getName() : unknown.get(); + } + + /** + * Start tracking changes from current state. + */ + public void startTrackingChanges() { + this.metadataUpdated.set(false); + this.updateAudit.clear(); + this.updatedEntityGuids.clear(); + this.stillExistingEntityGuids.clear(); + this.urlUpdated = false; + } + + public boolean isMetadataUpdated() { + return this.metadataUpdated.get(); + } + + public String toQualifiedName(String componentId) { + return AtlasUtils.toQualifiedName(clusterName, componentId); + } + + public enum EntityChangeType { + AS_IS, + CREATED, + UPDATED, + DELETED; + + public static boolean containsChange(Collection<EntityChangeType> types) { + return types.contains(CREATED) || types.contains(UPDATED) || types.contains(DELETED); + } + } + + private EntityChangeType getEntityChangeType(String guid) { + if (!isGuidAssigned(guid)) { + return EntityChangeType.CREATED; + } else if (updatedEntityGuids.contains(guid)) { + return EntityChangeType.UPDATED; + } else if (!stillExistingEntityGuids.contains(guid)) { + return EntityChangeType.DELETED; + } + return EntityChangeType.AS_IS; + } + + public Map<EntityChangeType, List<AtlasEntity>> getChangedDataSetEntities() { + final Map<EntityChangeType, List<AtlasEntity>> changedEntities = Stream + .of(rootInputPortEntities.values().stream(), rootOutputPortEntities.values().stream(), queues.values().stream()) + .flatMap(Function.identity()) + .collect(Collectors.groupingBy(entity -> getEntityChangeType(entity.getGuid()))); + updateAudit.add("CREATED DataSet entities=" + changedEntities.get(EntityChangeType.CREATED)); + updateAudit.add("UPDATED DataSet entities=" + changedEntities.get(EntityChangeType.UPDATED)); + updateAudit.add("DELETED DataSet entities=" + changedEntities.get(EntityChangeType.DELETED)); + return changedEntities; + } + + public NiFiFlowPath getOrCreateFlowPath(String pathId) { + traversedPathIds.add(pathId); + return flowPaths.computeIfAbsent(pathId, k -> new NiFiFlowPath(pathId)); + } + + public boolean isTraversedPath(String pathId) { + return traversedPathIds.contains(pathId); + } + + private EntityChangeType getFlowPathChangeType(NiFiFlowPath path) { + if (path.getExEntity() == null) { + return EntityChangeType.CREATED; + } else if (path.isMetadataUpdated() || urlUpdated) { + return EntityChangeType.UPDATED; + } else if (!traversedPathIds.contains(path.getId())) { + return EntityChangeType.DELETED; + } + return EntityChangeType.AS_IS; + } + + private EntityChangeType getFlowPathIOChangeType(AtlasObjectId id) { + final String guid = id.getGuid(); + if (!isGuidAssigned(guid)) { + return EntityChangeType.CREATED; + } else { + if (TYPE_NIFI_QUEUE.equals(id.getTypeName()) && queues.containsKey(id)) { + // If an input/output is a queue, and it is owned by this NiFiFlow, then check if it's still needed. NiFiFlow knows active queues. + if (stillExistingEntityGuids.contains(guid)) { + return EntityChangeType.AS_IS; + } else { + return EntityChangeType.DELETED; + } + } else { + // Otherwise, do not need to delete. + return EntityChangeType.AS_IS; + } + } + } + + private Tuple<EntityChangeType, AtlasEntity> toAtlasEntity(EntityChangeType changeType, final NiFiFlowPath path) { + + final AtlasEntity entity = EntityChangeType.CREATED.equals(changeType) ? new AtlasEntity() : new AtlasEntity(path.getExEntity()); + entity.setTypeName(TYPE_NIFI_FLOW_PATH); + entity.setVersion(1L); + entity.setAttribute(ATTR_NIFI_FLOW, getAtlasObjectId()); + + final StringBuilder name = new StringBuilder(); + final StringBuilder description = new StringBuilder(); + path.getProcessComponentIds().forEach(pid -> { + final String componentName = getProcessComponentName(pid); + + if (name.length() > 0) { + name.append(", "); + description.append(", "); + } + name.append(componentName); + description.append(String.format("%s::%s", componentName, pid)); + }); + + path.setName(name.toString()); + entity.setAttribute(ATTR_NAME, name.toString()); + entity.setAttribute(ATTR_DESCRIPTION, description.toString()); + + // Use first processor's id as qualifiedName. + entity.setAttribute(ATTR_QUALIFIED_NAME, toQualifiedName(path.getId())); + + entity.setAttribute(ATTR_URL, path.createDeepLinkURL(getUrl())); + + final boolean inputsChanged = setChangedIOIds(path, entity, true); + final boolean outputsChanged = setChangedIOIds(path, entity, false); + + // Even iff there's no flow path metadata changed, if any IO is changed then the pass should be updated. + EntityChangeType finalChangeType = EntityChangeType.AS_IS.equals(changeType) + ? (path.isMetadataUpdated() || inputsChanged || outputsChanged ? EntityChangeType.UPDATED : EntityChangeType.AS_IS) + : changeType; + + return new Tuple<>(finalChangeType, entity); + } + + /** + * Set input or output DataSet ids for a NiFiFlowPath. + * The updated ids only containing active ids. + * @return True if there is any changed IO reference (create, update, delete). + */ + private boolean setChangedIOIds(NiFiFlowPath path, AtlasEntity pathEntity, boolean isInput) { + Set<AtlasObjectId> ids = isInput ? path.getInputs() : path.getOutputs(); + String targetAttribute = isInput ? ATTR_INPUTS : ATTR_OUTPUTS; + final Map<EntityChangeType, List<AtlasObjectId>> changedIOIds + = ids.stream().collect(Collectors.groupingBy(this::getFlowPathIOChangeType)); + // Remove DELETED references. + final Set<AtlasObjectId> remainingFlowPathIOIds = toRemainingFlowPathIOIds(changedIOIds); + + // If references are changed, update it. + if (path.isDataSetReferenceChanged(remainingFlowPathIOIds, isInput)) { + pathEntity.setAttribute(targetAttribute, remainingFlowPathIOIds); + return true; + } + return false; + } + + private Set<AtlasObjectId> toRemainingFlowPathIOIds(Map<EntityChangeType, List<AtlasObjectId>> ids) { + return ids.entrySet().stream() + .filter(entry -> !EntityChangeType.DELETED.equals(entry.getKey())) + .flatMap(entry -> entry.getValue().stream()) + .collect(Collectors.toSet()); + } + + public Map<EntityChangeType, List<AtlasEntity>> getChangedFlowPathEntities() { + // Convert NiFiFlowPath to AtlasEntity. + final HashMap<EntityChangeType, List<AtlasEntity>> changedPaths = flowPaths.values().stream() + .map(path -> { + final EntityChangeType changeType = getFlowPathChangeType(path); + switch (changeType) { + case CREATED: + case UPDATED: + case AS_IS: + return toAtlasEntity(changeType, path); + default: + return new Tuple<>(changeType, path.getExEntity()); + } + }).collect(Collectors.groupingBy(Tuple::getKey, HashMap::new, Collectors.mapping(Tuple::getValue, Collectors.toList()))); + + updateAudit.add("CREATED NiFiFlowPath=" + changedPaths.get(EntityChangeType.CREATED)); + updateAudit.add("UPDATED NiFiFlowPath=" + changedPaths.get(EntityChangeType.UPDATED)); + updateAudit.add("DELETED NiFiFlowPath=" + changedPaths.get(EntityChangeType.DELETED)); + return changedPaths; + } + + public List<String> getUpdateAudit() { + return updateAudit; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiFlowAnalyzer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiFlowAnalyzer.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiFlowAnalyzer.java new file mode 100644 index 0000000..777c35a --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiFlowAnalyzer.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas; + +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.nifi.controller.status.ConnectionStatus; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.util.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +public class NiFiFlowAnalyzer { + + private static final Logger logger = LoggerFactory.getLogger(NiFiFlowAnalyzer.class); + + public void analyzeProcessGroup(NiFiFlow nifiFlow, ProcessGroupStatus rootProcessGroup) { + analyzeProcessGroup(rootProcessGroup, nifiFlow); + analyzeRootGroupPorts(nifiFlow, rootProcessGroup); + } + + private void analyzeRootGroupPorts(NiFiFlow nifiFlow, ProcessGroupStatus rootProcessGroup) { + rootProcessGroup.getInputPortStatus().forEach(port -> nifiFlow.addRootInputPort(port)); + rootProcessGroup.getOutputPortStatus().forEach(port -> nifiFlow.addRootOutputPort(port)); + } + + private void analyzeProcessGroup(final ProcessGroupStatus processGroupStatus, final NiFiFlow nifiFlow) { + + processGroupStatus.getConnectionStatus().forEach(c -> nifiFlow.addConnection(c)); + processGroupStatus.getProcessorStatus().forEach(p -> nifiFlow.addProcessor(p)); + processGroupStatus.getRemoteProcessGroupStatus().forEach(r -> nifiFlow.addRemoteProcessGroup(r)); + processGroupStatus.getInputPortStatus().forEach(p -> nifiFlow.addInputPort(p)); + processGroupStatus.getOutputPortStatus().forEach(p -> nifiFlow.addOutputPort(p)); + + // Analyze child ProcessGroups recursively. + for (ProcessGroupStatus child : processGroupStatus.getProcessGroupStatus()) { + analyzeProcessGroup(child, nifiFlow); + } + + } + + private List<String> getIncomingProcessorsIds(NiFiFlow nifiFlow, List<ConnectionStatus> incomingConnections) { + if (incomingConnections == null) { + return Collections.emptyList(); + } + + final List<String> ids = new ArrayList<>(); + + incomingConnections.forEach(c -> { + // Ignore self relationship. + final String sourceId = c.getSourceId(); + if (!sourceId.equals(c.getDestinationId())) { + if (nifiFlow.isProcessor(sourceId)) { + ids.add(sourceId); + } else { + ids.addAll(getIncomingProcessorsIds(nifiFlow, nifiFlow.getIncomingConnections(sourceId))); + } + } + }); + + return ids; + } + + private List<String> getNextProcessComponent(NiFiFlow nifiFlow, NiFiFlowPath path, String componentId) { + final List<ConnectionStatus> outs = nifiFlow.getOutgoingConnections(componentId); + if (outs == null || outs.isEmpty()) { + return Collections.emptyList(); + } + + final List<String> nextProcessComponent = new ArrayList<>(); + for (ConnectionStatus out : outs) { + final String destinationId = out.getDestinationId(); + if (path.getProcessComponentIds().contains(destinationId)) { + // If the connection is pointing back to current path, then skip it to avoid loop. + continue; + } + + if (nifiFlow.isProcessComponent(destinationId)) { + nextProcessComponent.add(destinationId); + } else { + nextProcessComponent.addAll(getNextProcessComponent(nifiFlow, path, destinationId)); + } + } + return nextProcessComponent; + } + + private void traverse(NiFiFlow nifiFlow, NiFiFlowPath path, String componentId) { + + // If the pid is RootInputPort of the same NiFi instance, then stop traversing to create separate self S2S path. + // E.g InputPort -> MergeContent, GenerateFlowFile -> InputPort. + if (path.getProcessComponentIds().size() > 0 && nifiFlow.isRootInputPort(componentId)) { + return; + } + + // Add known inputs/outputs to/from this processor, such as RootGroupIn/Output port + if (nifiFlow.isProcessComponent(componentId)) { + path.addProcessor(componentId); + } + + final List<ConnectionStatus> outs = nifiFlow.getOutgoingConnections(componentId); + if (outs == null || outs.isEmpty()) { + return; + } + + // Analyze destination process components. + final List<String> nextProcessComponents = getNextProcessComponent(nifiFlow, path, componentId); + nextProcessComponents.forEach(destPid -> { + if (path.getProcessComponentIds().contains(destPid)) { + // Avoid loop to it self. + return; + } + + // If the destination has more than one inputs, or there are multiple destinations, + // then it should be treated as a separate flow path. + final boolean createJointPoint = nextProcessComponents.size() > 1 + || getIncomingProcessorsIds(nifiFlow, nifiFlow.getIncomingConnections(destPid)).size() > 1; + + if (createJointPoint) { + + final boolean alreadyTraversed = nifiFlow.isTraversedPath(destPid); + + // Create an input queue DataSet because Atlas doesn't show lineage if it doesn't have in and out. + // This DataSet is also useful to link flowPaths together on Atlas lineage graph. + final Tuple<AtlasObjectId, AtlasEntity> queueTuple = nifiFlow.getOrCreateQueue(destPid); + + final AtlasObjectId queueId = queueTuple.getKey(); + path.getOutputs().add(queueId); + + // If the destination is already traversed once, it doesn't have to be visited again. + if (alreadyTraversed) { + return; + } + + // Get existing or create new one. + final NiFiFlowPath jointPoint = nifiFlow.getOrCreateFlowPath(destPid); + jointPoint.getInputs().add(queueId); + + // Start traversing as a new joint point. + traverse(nifiFlow, jointPoint, destPid); + + } else { + // Normal relation, continue digging. + traverse(nifiFlow, path, destPid); + } + + }); + } + + private boolean isHeadProcessor(NiFiFlow nifiFlow, List<ConnectionStatus> ins) { + if (ins == null || ins.isEmpty()) { + return true; + } + return ins.stream().allMatch( + in -> { + // If it has incoming relationship from other process components, then return false. + final String sourceId = in.getSourceId(); + if (nifiFlow.isProcessComponent(sourceId)) { + return false; + } + // Check next level. + final List<ConnectionStatus> incomingConnections = nifiFlow.getIncomingConnections(sourceId); + return isHeadProcessor(nifiFlow, incomingConnections); + } + ); + } + + public void analyzePaths(NiFiFlow nifiFlow) { + final String rootProcessGroupId = nifiFlow.getRootProcessGroupId(); + + // Now let's break it into flow paths. + final Map<String, ProcessorStatus> processors = nifiFlow.getProcessors(); + final Set<String> headProcessComponents = processors.keySet().stream() + .filter(pid -> { + final List<ConnectionStatus> ins = nifiFlow.getIncomingConnections(pid); + return isHeadProcessor(nifiFlow, ins); + }) + .collect(Collectors.toSet()); + + // Use RootInputPorts as headProcessors. + headProcessComponents.addAll(nifiFlow.getRootInputPorts().keySet()); + + headProcessComponents.forEach(startPid -> { + // By using the startPid as its qualifiedName, it's guaranteed that + // the same path will end up being the same Atlas entity. + // However, if the first processor is replaced by another, + // the flow path will have a different id, and the old path is logically deleted. + final NiFiFlowPath path = nifiFlow.getOrCreateFlowPath(startPid); + traverse(nifiFlow, path, startPid); + }); + + nifiFlow.getFlowPaths().values().forEach(path -> { + if (processors.containsKey(path.getId())) { + final ProcessorStatus processor = processors.get(path.getId()); + path.setGroupId(processor.getGroupId()); + } else { + path.setGroupId(rootProcessGroupId); + } + }); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiFlowPath.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiFlowPath.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiFlowPath.java new file mode 100644 index 0000000..2ecec97 --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiFlowPath.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas; + +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasObjectId; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +import static org.apache.nifi.atlas.AtlasUtils.updateMetadata; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME; + +public class NiFiFlowPath implements AtlasProcess { + private final List<String> processComponentIds = new ArrayList<>(); + + private final String id; + private final Set<AtlasObjectId> inputs = new HashSet<>(); + private final Set<AtlasObjectId> outputs = new HashSet<>(); + + private String atlasGuid; + private String name; + private String groupId; + + private AtlasEntity exEntity; + + private AtomicBoolean metadataUpdated = new AtomicBoolean(false); + private List<String> updateAudit = new ArrayList<>(); + private Set<String> existingInputGuids; + private Set<String> existingOutputGuids; + + + public NiFiFlowPath(String id) { + this.id = id; + } + public NiFiFlowPath(String id, long lineageHash) { + this.id = id + "::" + lineageHash; + } + + public AtlasEntity getExEntity() { + return exEntity; + } + + public void setExEntity(AtlasEntity exEntity) { + this.exEntity = exEntity; + this.atlasGuid = exEntity.getGuid(); + } + + public String getAtlasGuid() { + return atlasGuid; + } + + public void setAtlasGuid(String atlasGuid) { + this.atlasGuid = atlasGuid; + } + + public String getName() { + return name; + } + + public void setName(String name) { + updateMetadata(metadataUpdated, updateAudit, ATTR_NAME, this.name, name); + this.name = name; + } + + public String getGroupId() { + return groupId; + } + + public void setGroupId(String groupId) { + updateMetadata(metadataUpdated, updateAudit, "groupId", this.groupId, groupId); + this.groupId = groupId; + } + + public void addProcessor(String processorId) { + processComponentIds.add(processorId); + } + + public Set<AtlasObjectId> getInputs() { + return inputs; + } + + public Set<AtlasObjectId> getOutputs() { + return outputs; + } + + public List<String> getProcessComponentIds() { + return processComponentIds; + } + + public String getId() { + return id; + } + + public String createDeepLinkURL(String nifiUrl) { + // Remove lineage hash part. + final String componentId = id.split("::")[0]; + return componentId.equals(groupId) + // This path represents the root path of a process group. + ? String.format("%s?processGroupId=%s", nifiUrl, groupId) + // This path represents a partial flow within a process group consists of processors. + : String.format("%s?processGroupId=%s&componentIds=%s", nifiUrl, groupId, componentId); + } + + /** + * Start tracking changes from current state. + */ + public void startTrackingChanges(NiFiFlow nifiFlow) { + this.metadataUpdated.set(false); + this.updateAudit.clear(); + existingInputGuids = inputs.stream().map(AtlasObjectId::getGuid).collect(Collectors.toSet()); + existingOutputGuids = outputs.stream().map(AtlasObjectId::getGuid).collect(Collectors.toSet()); + + // Remove all nifi_queues those are owned by the nifiFlow to delete ones no longer exist. + // Because it should be added again if not deleted when flow analysis finished. + final Set<AtlasObjectId> ownedQueues = nifiFlow.getQueues().keySet(); + inputs.removeAll(ownedQueues); + outputs.removeAll(ownedQueues); + } + + public boolean isMetadataUpdated() { + return this.metadataUpdated.get(); + } + + public List<String> getUpdateAudit() { + return updateAudit; + } + + boolean isDataSetReferenceChanged(Set<AtlasObjectId> ids, boolean isInput) { + final Set<String> guids = ids.stream().map(AtlasObjectId::getGuid).collect(Collectors.toSet()); + final Set<String> existingGuids = isInput ? existingInputGuids : existingOutputGuids; + return existingGuids == null || !existingGuids.equals(guids); + } + + @Override + public String toString() { + return "NiFiFlowPath{" + + "name='" + name + '\'' + + ", inputs=" + inputs + + ", outputs=" + outputs + + ", processComponentIds=" + processComponentIds + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + NiFiFlowPath that = (NiFiFlowPath) o; + + return id.equals(that.id); + } + + @Override + public int hashCode() { + return id.hashCode(); + } +}
