Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2335#discussion_r157237190
  
    --- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/SimpleFlowPathLineage.java
 ---
    @@ -0,0 +1,205 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.atlas.provenance.lineage;
    +
    +import org.apache.atlas.typesystem.Referenceable;
    +import org.apache.nifi.atlas.NiFiFlow;
    +import org.apache.nifi.atlas.NiFiFlowPath;
    +import org.apache.nifi.atlas.provenance.AnalysisContext;
    +import org.apache.nifi.atlas.provenance.DataSetRefs;
    +import org.apache.nifi.controller.status.ConnectionStatus;
    +import org.apache.nifi.provenance.ProvenanceEventRecord;
    +import org.apache.nifi.provenance.lineage.ComputeLineageResult;
    +import org.apache.nifi.provenance.lineage.LineageEdge;
    +import org.apache.nifi.provenance.lineage.LineageNode;
    +import org.apache.nifi.provenance.lineage.LineageNodeType;
    +
    +import java.util.List;
    +
    +import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
    +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
    +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE;
    +
    +public class SimpleFlowPathLineage extends AbstractLineageStrategy {
    +
    +    @Override
    +    public void processEvent(AnalysisContext analysisContext, NiFiFlow 
nifiFlow, ProvenanceEventRecord event) {
    +        final DataSetRefs refs = executeAnalyzer(analysisContext, event);
    +        if (refs == null || (refs.isEmpty())) {
    +            return;
    +        }
    +
    +        if ("Remote Input Port".equals(event.getComponentType()) || 
"Remote Output Port".equals(event.getComponentType())) {
    +            processRemotePortEvent(analysisContext, nifiFlow, event, refs);
    +        } else {
    +            addDataSetRefs(nifiFlow, refs);
    +        }
    +
    +    }
    +
    +    /**
    +     * Create a flow_path entity corresponding to the target 
RemoteGroupPort when a SEND/RECEIVE event are received.
    +     * Because such entity can not be created in advance while analyzing 
flow statically,
    +     * as ReportingTask can not determine whether a component id is a 
RemoteGroupPort,
    +     * since connectionStatus is the only available information in 
ReportingContext.
    +     * ConnectionStatus only knows component id, component type is unknown.
    +     * For example, there is no difference to tell if a connected 
component is a funnel or a RemoteGroupPort.
    +     */
    +    private void processRemotePortEvent(AnalysisContext analysisContext, 
NiFiFlow nifiFlow, ProvenanceEventRecord event, DataSetRefs analyzedRefs) {
    +
    +        final boolean isRemoteInputPort = "Remote Input 
Port".equals(event.getComponentType());
    +
    +        // Create a RemoteInputPort Process.
    +        // event.getComponentId returns UUID for RemoteGroupPort as a 
client of S2S, and it's different from a remote port UUID (portDataSetid).
    +        // See NIFI-4571 for detail.
    +        final Referenceable remotePortDataSet = isRemoteInputPort ? 
analyzedRefs.getOutputs().iterator().next() :  
analyzedRefs.getInputs().iterator().next();
    +        final String portProcessId = event.getComponentId();
    +
    +        final NiFiFlowPath remotePortProcess = new 
NiFiFlowPath(portProcessId);
    +        remotePortProcess.setName(event.getComponentType());
    +        remotePortProcess.addProcessor(portProcessId);
    +
    +        // For RemoteInputPort, need to find the previous component 
connected to this port,
    +        // which passed this particular FlowFile.
    +        // That is only possible by calling lineage API.
    +        if (isRemoteInputPort) {
    +            final ProvenanceEventRecord previousEvent = 
findPreviousProvenanceEvent(analysisContext, event);
    +            if (previousEvent == null) {
    +                logger.warn("Previous event was not found: {}", new 
Object[]{event});
    +                return;
    +            }
    +
    +            // Set groupId from incoming connection if available.
    +            final List<ConnectionStatus> incomingRelationShips = 
nifiFlow.getIncomingRelationShips(portProcessId);
    --- End diff --
    
    @ijokarumawak yes, there is a distinction. A 'relationship' is defined by a 
processor in order to indicate which route a FlowFile should take. A Connection 
is defined by a user when creating the dataflow. The connection consists of one 
or more Relationships, and multiple connections can consist of the same 
relationship. So when a processor transfers FlowFile A to relationship 
'success' for example, it may be placed into a single Connection. Or cloned and 
added to 5 different Connections. Or it may be auto-terminated and not go into 
any Connections. Does that make sense?


---

Reply via email to