[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292728#comment-16292728
 ] 

ASF GitHub Bot commented on NIFI-3709:
--------------------------------------

Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2335#discussion_r157237190
  
    --- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/SimpleFlowPathLineage.java
 ---
    @@ -0,0 +1,205 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.atlas.provenance.lineage;
    +
    +import org.apache.atlas.typesystem.Referenceable;
    +import org.apache.nifi.atlas.NiFiFlow;
    +import org.apache.nifi.atlas.NiFiFlowPath;
    +import org.apache.nifi.atlas.provenance.AnalysisContext;
    +import org.apache.nifi.atlas.provenance.DataSetRefs;
    +import org.apache.nifi.controller.status.ConnectionStatus;
    +import org.apache.nifi.provenance.ProvenanceEventRecord;
    +import org.apache.nifi.provenance.lineage.ComputeLineageResult;
    +import org.apache.nifi.provenance.lineage.LineageEdge;
    +import org.apache.nifi.provenance.lineage.LineageNode;
    +import org.apache.nifi.provenance.lineage.LineageNodeType;
    +
    +import java.util.List;
    +
    +import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
    +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
    +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE;
    +
    +public class SimpleFlowPathLineage extends AbstractLineageStrategy {
    +
    +    @Override
    +    public void processEvent(AnalysisContext analysisContext, NiFiFlow 
nifiFlow, ProvenanceEventRecord event) {
    +        final DataSetRefs refs = executeAnalyzer(analysisContext, event);
    +        if (refs == null || (refs.isEmpty())) {
    +            return;
    +        }
    +
    +        if ("Remote Input Port".equals(event.getComponentType()) || 
"Remote Output Port".equals(event.getComponentType())) {
    +            processRemotePortEvent(analysisContext, nifiFlow, event, refs);
    +        } else {
    +            addDataSetRefs(nifiFlow, refs);
    +        }
    +
    +    }
    +
    +    /**
    +     * Create a flow_path entity corresponding to the target 
RemoteGroupPort when a SEND/RECEIVE event are received.
    +     * Because such entity can not be created in advance while analyzing 
flow statically,
    +     * as ReportingTask can not determine whether a component id is a 
RemoteGroupPort,
    +     * since connectionStatus is the only available information in 
ReportingContext.
    +     * ConnectionStatus only knows component id, component type is unknown.
    +     * For example, there is no difference to tell if a connected 
component is a funnel or a RemoteGroupPort.
    +     */
    +    private void processRemotePortEvent(AnalysisContext analysisContext, 
NiFiFlow nifiFlow, ProvenanceEventRecord event, DataSetRefs analyzedRefs) {
    +
    +        final boolean isRemoteInputPort = "Remote Input 
Port".equals(event.getComponentType());
    +
    +        // Create a RemoteInputPort Process.
    +        // event.getComponentId returns UUID for RemoteGroupPort as a 
client of S2S, and it's different from a remote port UUID (portDataSetid).
    +        // See NIFI-4571 for detail.
    +        final Referenceable remotePortDataSet = isRemoteInputPort ? 
analyzedRefs.getOutputs().iterator().next() :  
analyzedRefs.getInputs().iterator().next();
    +        final String portProcessId = event.getComponentId();
    +
    +        final NiFiFlowPath remotePortProcess = new 
NiFiFlowPath(portProcessId);
    +        remotePortProcess.setName(event.getComponentType());
    +        remotePortProcess.addProcessor(portProcessId);
    +
    +        // For RemoteInputPort, need to find the previous component 
connected to this port,
    +        // which passed this particular FlowFile.
    +        // That is only possible by calling lineage API.
    +        if (isRemoteInputPort) {
    +            final ProvenanceEventRecord previousEvent = 
findPreviousProvenanceEvent(analysisContext, event);
    +            if (previousEvent == null) {
    +                logger.warn("Previous event was not found: {}", new 
Object[]{event});
    +                return;
    +            }
    +
    +            // Set groupId from incoming connection if available.
    +            final List<ConnectionStatus> incomingRelationShips = 
nifiFlow.getIncomingRelationShips(portProcessId);
    --- End diff --
    
    @ijokarumawak yes, there is a distinction. A 'relationship' is defined by a 
processor in order to indicate which route a FlowFile should take. A Connection 
is defined by a user when creating the dataflow. The connection consists of one 
or more Relationships, and multiple connections can consist of the same 
relationship. So when a processor transfers FlowFile A to relationship 
'success' for example, it may be placed into a single Connection. Or cloned and 
added to 5 different Connections. Or it may be auto-terminated and not go into 
any Connections. Does that make sense?


> Export NiFi flow dataset lineage to Apache Atlas
> ------------------------------------------------
>
>                 Key: NIFI-3709
>                 URL: https://issues.apache.org/jira/browse/NIFI-3709
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Extensions
>            Reporter: Koji Kawamura
>            Assignee: Koji Kawamura
>
> While Apache NiFi has provenance and event level lineage support within its 
> data flow, Apache Atlas also does manage lineage between dataset and process 
> those interacting with such data. 
> It would be beneficial for users who use both NiFi and Atlas and if they can 
> see end-to-end data lineage on Atlas lineage graph, as some type of dataset 
> are processed by both NiFi and technologies around Atlas such as Storm, 
> Falcon or Sqoop. For example, Kafka topics and Hive tables.
> In order to make this integration happen, I propose a NiFi reporting task 
> that analyzes NiFi flow then creates DataSet and Process entities in Atlas.
> The challenge is how to design NiFi flow dataset level lineage within Atlas 
> lineage graph.
> If we just add a single NiFi process and connect every DataSet from/to it, it 
> would be too ambiguous since it won't be clear which part of a NiFi flow 
> actually interact with certain dataset.
> But if we put every NiFi processor as independent process in Atlas, it would 
> be too granular, too. Also, we already have detailed event level lineage in 
> NiFi, we wouldn't need the same level in Atlas.
> If we can group certain processors in a NiFI flow as a process in Atlas, it 
> would be a nice granularity.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to