[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292728#comment-16292728 ]
ASF GitHub Bot commented on NIFI-3709: -------------------------------------- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2335#discussion_r157237190 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/SimpleFlowPathLineage.java --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance.lineage; + +import org.apache.atlas.typesystem.Referenceable; +import org.apache.nifi.atlas.NiFiFlow; +import org.apache.nifi.atlas.NiFiFlowPath; +import org.apache.nifi.atlas.provenance.AnalysisContext; +import org.apache.nifi.atlas.provenance.DataSetRefs; +import org.apache.nifi.controller.status.ConnectionStatus; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.lineage.ComputeLineageResult; +import org.apache.nifi.provenance.lineage.LineageEdge; +import org.apache.nifi.provenance.lineage.LineageNode; +import org.apache.nifi.provenance.lineage.LineageNodeType; + +import java.util.List; + +import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE; + +public class SimpleFlowPathLineage extends AbstractLineageStrategy { + + @Override + public void processEvent(AnalysisContext analysisContext, NiFiFlow nifiFlow, ProvenanceEventRecord event) { + final DataSetRefs refs = executeAnalyzer(analysisContext, event); + if (refs == null || (refs.isEmpty())) { + return; + } + + if ("Remote Input Port".equals(event.getComponentType()) || "Remote Output Port".equals(event.getComponentType())) { + processRemotePortEvent(analysisContext, nifiFlow, event, refs); + } else { + addDataSetRefs(nifiFlow, refs); + } + + } + + /** + * Create a flow_path entity corresponding to the target RemoteGroupPort when a SEND/RECEIVE event are received. + * Because such entity can not be created in advance while analyzing flow statically, + * as ReportingTask can not determine whether a component id is a RemoteGroupPort, + * since connectionStatus is the only available information in ReportingContext. + * ConnectionStatus only knows component id, component type is unknown. + * For example, there is no difference to tell if a connected component is a funnel or a RemoteGroupPort. + */ + private void processRemotePortEvent(AnalysisContext analysisContext, NiFiFlow nifiFlow, ProvenanceEventRecord event, DataSetRefs analyzedRefs) { + + final boolean isRemoteInputPort = "Remote Input Port".equals(event.getComponentType()); + + // Create a RemoteInputPort Process. + // event.getComponentId returns UUID for RemoteGroupPort as a client of S2S, and it's different from a remote port UUID (portDataSetid). + // See NIFI-4571 for detail. + final Referenceable remotePortDataSet = isRemoteInputPort ? analyzedRefs.getOutputs().iterator().next() : analyzedRefs.getInputs().iterator().next(); + final String portProcessId = event.getComponentId(); + + final NiFiFlowPath remotePortProcess = new NiFiFlowPath(portProcessId); + remotePortProcess.setName(event.getComponentType()); + remotePortProcess.addProcessor(portProcessId); + + // For RemoteInputPort, need to find the previous component connected to this port, + // which passed this particular FlowFile. + // That is only possible by calling lineage API. + if (isRemoteInputPort) { + final ProvenanceEventRecord previousEvent = findPreviousProvenanceEvent(analysisContext, event); + if (previousEvent == null) { + logger.warn("Previous event was not found: {}", new Object[]{event}); + return; + } + + // Set groupId from incoming connection if available. + final List<ConnectionStatus> incomingRelationShips = nifiFlow.getIncomingRelationShips(portProcessId); --- End diff -- @ijokarumawak yes, there is a distinction. A 'relationship' is defined by a processor in order to indicate which route a FlowFile should take. A Connection is defined by a user when creating the dataflow. The connection consists of one or more Relationships, and multiple connections can consist of the same relationship. So when a processor transfers FlowFile A to relationship 'success' for example, it may be placed into a single Connection. Or cloned and added to 5 different Connections. Or it may be auto-terminated and not go into any Connections. Does that make sense? > Export NiFi flow dataset lineage to Apache Atlas > ------------------------------------------------ > > Key: NIFI-3709 > URL: https://issues.apache.org/jira/browse/NIFI-3709 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions > Reporter: Koji Kawamura > Assignee: Koji Kawamura > > While Apache NiFi has provenance and event level lineage support within its > data flow, Apache Atlas also does manage lineage between dataset and process > those interacting with such data. > It would be beneficial for users who use both NiFi and Atlas and if they can > see end-to-end data lineage on Atlas lineage graph, as some type of dataset > are processed by both NiFi and technologies around Atlas such as Storm, > Falcon or Sqoop. For example, Kafka topics and Hive tables. > In order to make this integration happen, I propose a NiFi reporting task > that analyzes NiFi flow then creates DataSet and Process entities in Atlas. > The challenge is how to design NiFi flow dataset level lineage within Atlas > lineage graph. > If we just add a single NiFi process and connect every DataSet from/to it, it > would be too ambiguous since it won't be clear which part of a NiFi flow > actually interact with certain dataset. > But if we put every NiFi processor as independent process in Atlas, it would > be too granular, too. Also, we already have detailed event level lineage in > NiFi, we wouldn't need the same level in Atlas. > If we can group certain processors in a NiFI flow as a process in Atlas, it > would be a nice granularity. -- This message was sent by Atlassian JIRA (v6.4.14#64029)