[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292582#comment-16292582 ]
ASF GitHub Bot commented on NIFI-3709: -------------------------------------- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2335#discussion_r157208859 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java --- @@ -0,0 +1,537 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas; + +import com.sun.jersey.api.client.UniformInterfaceException; +import com.sun.jersey.core.util.MultivaluedMapImpl; +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasClientV2; +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.model.SearchFilter; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.model.instance.EntityMutationResponse; +import org.apache.atlas.model.typedef.AtlasEntityDef; +import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; +import org.apache.atlas.model.typedef.AtlasTypesDef; +import org.apache.nifi.atlas.security.AtlasAuthN; +import org.apache.nifi.util.StringUtils; +import org.apache.nifi.util.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.core.MultivaluedMap; +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.apache.nifi.atlas.AtlasUtils.findIdByQualifiedName; +import static org.apache.nifi.atlas.AtlasUtils.getComponentIdFromQualifiedName; +import static org.apache.nifi.atlas.AtlasUtils.toStr; +import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.AS_IS; +import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.CREATED; +import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.DELETED; +import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.UPDATED; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_DESCRIPTION; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_FLOW_PATHS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUT_PORTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUT_PORTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUEUES; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_URL; +import static org.apache.nifi.atlas.NiFiTypes.ENTITIES; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_INPUT_PORT; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_OUTPUT_PORT; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE; + +public class NiFiAtlasClient { + + private static final Logger logger = LoggerFactory.getLogger(NiFiAtlasClient.class); + + private static NiFiAtlasClient nifiClient; + private AtlasClientV2 atlasClient; + + private NiFiAtlasClient() { + super(); + } + + public static NiFiAtlasClient getInstance() { + if (nifiClient == null) { + synchronized (NiFiAtlasClient.class) { + if (nifiClient == null) { + nifiClient = new NiFiAtlasClient(); + } + } + } + return nifiClient; + } + + public void initialize(final String[] baseUrls, final AtlasAuthN authN, final File atlasConfDir) { + + synchronized (NiFiAtlasClient.class) { + + if (atlasClient != null) { + logger.info("{} had been setup but replacing it with new one.", atlasClient); + ApplicationProperties.forceReload(); + } + + if (atlasConfDir != null) { + // If atlasConfDir is not set, atlas-application.properties will be searched under classpath. + Properties props = System.getProperties(); + final String atlasConfProp = "atlas.conf"; + props.setProperty(atlasConfProp, atlasConfDir.getAbsolutePath()); + logger.debug("{} has been set to: {}", atlasConfProp, props.getProperty(atlasConfProp)); + } + + atlasClient = authN.createClient(baseUrls); + + } + } + + /** + * This is an utility method to delete unused types. + * Should be used during development or testing only. + * @param typeNames to delete + */ + void deleteTypeDefs(String ... typeNames) throws AtlasServiceException { + final AtlasTypesDef existingTypeDef = getTypeDefs(typeNames); + try { + atlasClient.deleteAtlasTypeDefs(existingTypeDef); + } catch (UniformInterfaceException e) { + if (e.getResponse().getStatus() == 204) { + // 204 is a successful response. + // NOTE: However after executing this, Atlas should be restarted to work properly. + logger.info("Deleted type defs: {}", existingTypeDef); + } else { + throw e; + } + } + } + + /** + * @return True when required NiFi types are already created. + */ + public boolean isNiFiTypeDefsRegistered() throws AtlasServiceException { + final Set<String> typeNames = ENTITIES.keySet(); + final Map<String, AtlasEntityDef> existingDefs = getTypeDefs(typeNames.toArray(new String[typeNames.size()])).getEntityDefs().stream() + .collect(Collectors.toMap(AtlasEntityDef::getName, Function.identity())); + return typeNames.stream().allMatch(existingDefs::containsKey); + } + + /** + * Create or update NiFi types in Atlas type system. + * @param update If false, doesn't perform anything if there is existing type def for the name. + */ + public void registerNiFiTypeDefs(boolean update) throws AtlasServiceException { + final Set<String> typeNames = ENTITIES.keySet(); + final Map<String, AtlasEntityDef> existingDefs = getTypeDefs(typeNames.toArray(new String[typeNames.size()])).getEntityDefs().stream() + .collect(Collectors.toMap(AtlasEntityDef::getName, Function.identity())); + + + final AtomicBoolean shouldUpdate = new AtomicBoolean(false); + + final AtlasTypesDef type = new AtlasTypesDef(); + + typeNames.stream().filter(typeName -> { + final AtlasEntityDef existingDef = existingDefs.get(typeName); + if (existingDef != null) { + // type is already defined. + if (!update) { + return false; + } + shouldUpdate.set(true); + } + return true; + }).forEach(typeName -> { + final NiFiTypes.EntityDefinition def = ENTITIES.get(typeName); + + final AtlasEntityDef entity = new AtlasEntityDef(); + type.getEntityDefs().add(entity); + + entity.setName(typeName); + + Set<String> superTypes = new HashSet<>(); + List<AtlasAttributeDef> attributes = new ArrayList<>(); + + def.define(entity, superTypes, attributes); + + entity.setSuperTypes(superTypes); + entity.setAttributeDefs(attributes); + }); + + // Create or Update. + final AtlasTypesDef atlasTypeDefsResult = shouldUpdate.get() + ? atlasClient.updateAtlasTypeDefs(type) + : atlasClient.createAtlasTypeDefs(type); + logger.debug("Result={}", atlasTypeDefsResult); + } + + private AtlasTypesDef getTypeDefs(String ... typeNames) throws AtlasServiceException { + final AtlasTypesDef typeDefs = new AtlasTypesDef(); + for (int i = 0; i < typeNames.length; i++) { + final MultivaluedMap<String, String> searchParams = new MultivaluedMapImpl(); + searchParams.add(SearchFilter.PARAM_NAME, typeNames[i]); + final AtlasTypesDef typeDef = atlasClient.getAllTypeDefs(new SearchFilter(searchParams)); + typeDefs.getEntityDefs().addAll(typeDef.getEntityDefs()); + } + logger.debug("typeDefs={}", typeDefs); + return typeDefs; + } + + private Pattern FLOW_PATH_URL_PATTERN = Pattern.compile("^http.+processGroupId=([0-9a-z\\-]+).*$"); + /** + * Fetch existing NiFiFlow entity from Atlas. + * @param rootProcessGroupId The id of a NiFi flow root process group. + * @param clusterName The cluster name of a flow. + * @return A NiFiFlow instance filled with retrieved data from Atlas. Status objects are left blank, e.g. ProcessorStatus. + * @throws AtlasServiceException Thrown if requesting to Atlas API failed, including when the flow is not found. + */ + public NiFiFlow fetchNiFiFlow(String rootProcessGroupId, String clusterName) throws AtlasServiceException { + + final String qualifiedName = AtlasUtils.toQualifiedName(clusterName, rootProcessGroupId); + final AtlasObjectId flowId = new AtlasObjectId(TYPE_NIFI_FLOW, ATTR_QUALIFIED_NAME, qualifiedName); + final AtlasEntity.AtlasEntityWithExtInfo nifiFlowExt = searchEntityDef(flowId); + + if (nifiFlowExt == null || nifiFlowExt.getEntity() == null) { + return null; + } + + final AtlasEntity nifiFlowEntity = nifiFlowExt.getEntity(); + final Map<String, Object> attributes = nifiFlowEntity.getAttributes(); + final NiFiFlow nifiFlow = new NiFiFlow(rootProcessGroupId); + nifiFlow.setExEntity(nifiFlowEntity); + nifiFlow.setFlowName(toStr(attributes.get(ATTR_NAME))); + nifiFlow.setClusterName(clusterName); + nifiFlow.setUrl(toStr(attributes.get(ATTR_URL))); + nifiFlow.setDescription(toStr(attributes.get(ATTR_DESCRIPTION))); + + nifiFlow.getQueues().putAll(toQualifiedNameIds(toAtlasObjectIds(nifiFlowEntity.getAttribute(ATTR_QUEUES)))); + nifiFlow.getRootInputPortEntities().putAll(toQualifiedNameIds(toAtlasObjectIds(nifiFlowEntity.getAttribute(ATTR_INPUT_PORTS)))); + nifiFlow.getRootOutputPortEntities().putAll(toQualifiedNameIds(toAtlasObjectIds(nifiFlowEntity.getAttribute(ATTR_OUTPUT_PORTS)))); + + final Map<String, NiFiFlowPath> flowPaths = nifiFlow.getFlowPaths(); + final Map<AtlasObjectId, AtlasEntity> flowPathEntities = toQualifiedNameIds(toAtlasObjectIds(attributes.get(ATTR_FLOW_PATHS))); + + for (AtlasEntity flowPathEntity : flowPathEntities.values()) { + final String pathQualifiedName = toStr(flowPathEntity.getAttribute(ATTR_QUALIFIED_NAME)); + final NiFiFlowPath flowPath = new NiFiFlowPath(getComponentIdFromQualifiedName(pathQualifiedName)); + if (flowPathEntity.hasAttribute(ATTR_URL)) { + final Matcher urlMatcher = FLOW_PATH_URL_PATTERN.matcher(toStr(flowPathEntity.getAttribute(ATTR_URL))); + if (urlMatcher.matches()) { + flowPath.setGroupId(urlMatcher.group(1)); + } + } + flowPath.setExEntity(flowPathEntity); + flowPath.setName(toStr(flowPathEntity.getAttribute(ATTR_NAME))); + flowPath.getInputs().addAll(toQualifiedNameIds(toAtlasObjectIds(flowPathEntity.getAttribute(ATTR_INPUTS))).keySet()); + flowPath.getOutputs().addAll(toQualifiedNameIds(toAtlasObjectIds(flowPathEntity.getAttribute(ATTR_OUTPUTS))).keySet()); + flowPath.startTrackingChanges(nifiFlow); + + flowPaths.put(flowPath.getId(), flowPath); + } + + nifiFlow.startTrackingChanges(); + return nifiFlow; + } + + @SuppressWarnings("unchecked") + private List<AtlasObjectId> toAtlasObjectIds(Object _references) { + if (_references == null) { + return Collections.emptyList(); + } + List<Map<String, Object>> references = (List<Map<String, Object>>) _references; + return references.stream() + .map(ref -> new AtlasObjectId(toStr(ref.get(ATTR_GUID)), toStr(ref.get(ATTR_TYPENAME)), ref)) + .collect(Collectors.toList()); + } + + /** + * <p>AtlasObjectIds returned from Atlas have GUID, but do not have qualifiedName, while ones created by the reporting task + * do not have GUID, but qualifiedName. AtlasObjectId.equals returns false for this combination. + * In order to match ids correctly, this method converts fetches actual entities from ids to get qualifiedName attribute.</p> + * + * <p>Also, AtlasObjectIds returned from Atlas does not have entity state. + * If Atlas is configured to use soft-delete (default), deleted ids are still returned. + * Fetched entities are used to determine whether an AtlasObjectId is still active or deleted. + * Deleted entities will not be included in the result of this method. + * </p> + * @param ids to convert + * @return AtlasObjectIds with qualifiedName + */ + private Map<AtlasObjectId, AtlasEntity> toQualifiedNameIds(List<AtlasObjectId> ids) { + if (ids == null) { + return Collections.emptyMap(); + } + + return ids.stream().distinct().map(id -> { + try { + final AtlasEntity.AtlasEntityWithExtInfo entityExt = searchEntityDef(id); + final AtlasEntity entity = entityExt.getEntity(); + if (AtlasEntity.Status.DELETED.equals(entity.getStatus())) { + return null; + } + final Map<String, Object> uniqueAttrs = Collections.singletonMap(ATTR_QUALIFIED_NAME, entity.getAttribute(ATTR_QUALIFIED_NAME)); + return new Tuple<>(new AtlasObjectId(id.getGuid(), id.getTypeName(), uniqueAttrs), entity); + } catch (AtlasServiceException e) { + return null; --- End diff -- Absolutely. Thanks! > Export NiFi flow dataset lineage to Apache Atlas > ------------------------------------------------ > > Key: NIFI-3709 > URL: https://issues.apache.org/jira/browse/NIFI-3709 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions > Reporter: Koji Kawamura > Assignee: Koji Kawamura > > While Apache NiFi has provenance and event level lineage support within its > data flow, Apache Atlas also does manage lineage between dataset and process > those interacting with such data. > It would be beneficial for users who use both NiFi and Atlas and if they can > see end-to-end data lineage on Atlas lineage graph, as some type of dataset > are processed by both NiFi and technologies around Atlas such as Storm, > Falcon or Sqoop. For example, Kafka topics and Hive tables. > In order to make this integration happen, I propose a NiFi reporting task > that analyzes NiFi flow then creates DataSet and Process entities in Atlas. > The challenge is how to design NiFi flow dataset level lineage within Atlas > lineage graph. > If we just add a single NiFi process and connect every DataSet from/to it, it > would be too ambiguous since it won't be clear which part of a NiFi flow > actually interact with certain dataset. > But if we put every NiFi processor as independent process in Atlas, it would > be too granular, too. Also, we already have detailed event level lineage in > NiFi, we wouldn't need the same level in Atlas. > If we can group certain processors in a NiFI flow as a process in Atlas, it > would be a nice granularity. -- This message was sent by Atlassian JIRA (v6.4.14#64029)