Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2335#discussion_r157237190 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/SimpleFlowPathLineage.java --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance.lineage; + +import org.apache.atlas.typesystem.Referenceable; +import org.apache.nifi.atlas.NiFiFlow; +import org.apache.nifi.atlas.NiFiFlowPath; +import org.apache.nifi.atlas.provenance.AnalysisContext; +import org.apache.nifi.atlas.provenance.DataSetRefs; +import org.apache.nifi.controller.status.ConnectionStatus; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.lineage.ComputeLineageResult; +import org.apache.nifi.provenance.lineage.LineageEdge; +import org.apache.nifi.provenance.lineage.LineageNode; +import org.apache.nifi.provenance.lineage.LineageNodeType; + +import java.util.List; + +import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE; + +public class SimpleFlowPathLineage extends AbstractLineageStrategy { + + @Override + public void processEvent(AnalysisContext analysisContext, NiFiFlow nifiFlow, ProvenanceEventRecord event) { + final DataSetRefs refs = executeAnalyzer(analysisContext, event); + if (refs == null || (refs.isEmpty())) { + return; + } + + if ("Remote Input Port".equals(event.getComponentType()) || "Remote Output Port".equals(event.getComponentType())) { + processRemotePortEvent(analysisContext, nifiFlow, event, refs); + } else { + addDataSetRefs(nifiFlow, refs); + } + + } + + /** + * Create a flow_path entity corresponding to the target RemoteGroupPort when a SEND/RECEIVE event are received. + * Because such entity can not be created in advance while analyzing flow statically, + * as ReportingTask can not determine whether a component id is a RemoteGroupPort, + * since connectionStatus is the only available information in ReportingContext. + * ConnectionStatus only knows component id, component type is unknown. + * For example, there is no difference to tell if a connected component is a funnel or a RemoteGroupPort. + */ + private void processRemotePortEvent(AnalysisContext analysisContext, NiFiFlow nifiFlow, ProvenanceEventRecord event, DataSetRefs analyzedRefs) { + + final boolean isRemoteInputPort = "Remote Input Port".equals(event.getComponentType()); + + // Create a RemoteInputPort Process. + // event.getComponentId returns UUID for RemoteGroupPort as a client of S2S, and it's different from a remote port UUID (portDataSetid). + // See NIFI-4571 for detail. + final Referenceable remotePortDataSet = isRemoteInputPort ? analyzedRefs.getOutputs().iterator().next() : analyzedRefs.getInputs().iterator().next(); + final String portProcessId = event.getComponentId(); + + final NiFiFlowPath remotePortProcess = new NiFiFlowPath(portProcessId); + remotePortProcess.setName(event.getComponentType()); + remotePortProcess.addProcessor(portProcessId); + + // For RemoteInputPort, need to find the previous component connected to this port, + // which passed this particular FlowFile. + // That is only possible by calling lineage API. + if (isRemoteInputPort) { + final ProvenanceEventRecord previousEvent = findPreviousProvenanceEvent(analysisContext, event); + if (previousEvent == null) { + logger.warn("Previous event was not found: {}", new Object[]{event}); + return; + } + + // Set groupId from incoming connection if available. + final List<ConnectionStatus> incomingRelationShips = nifiFlow.getIncomingRelationShips(portProcessId); --- End diff -- @ijokarumawak yes, there is a distinction. A 'relationship' is defined by a processor in order to indicate which route a FlowFile should take. A Connection is defined by a user when creating the dataflow. The connection consists of one or more Relationships, and multiple connections can consist of the same relationship. So when a processor transfers FlowFile A to relationship 'success' for example, it may be placed into a single Connection. Or cloned and added to 5 different Connections. Or it may be auto-terminated and not go into any Connections. Does that make sense?
---