Github user markap14 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2335#discussion_r157004365
--- Diff:
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/SimpleFlowPathLineage.java
---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.lineage;
+
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.NiFiFlowPath;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.DataSetRefs;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.lineage.ComputeLineageResult;
+import org.apache.nifi.provenance.lineage.LineageEdge;
+import org.apache.nifi.provenance.lineage.LineageNode;
+import org.apache.nifi.provenance.lineage.LineageNodeType;
+
+import java.util.List;
+
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE;
+
+public class SimpleFlowPathLineage extends AbstractLineageStrategy {
+
+ @Override
+ public void processEvent(AnalysisContext analysisContext, NiFiFlow
nifiFlow, ProvenanceEventRecord event) {
+ final DataSetRefs refs = executeAnalyzer(analysisContext, event);
+ if (refs == null || (refs.isEmpty())) {
+ return;
+ }
+
+ if ("Remote Input Port".equals(event.getComponentType()) ||
"Remote Output Port".equals(event.getComponentType())) {
+ processRemotePortEvent(analysisContext, nifiFlow, event, refs);
+ } else {
+ addDataSetRefs(nifiFlow, refs);
+ }
+
+ }
+
+ /**
+ * Create a flow_path entity corresponding to the target
RemoteGroupPort when a SEND/RECEIVE event are received.
+ * Because such entity can not be created in advance while analyzing
flow statically,
+ * as ReportingTask can not determine whether a component id is a
RemoteGroupPort,
+ * since connectionStatus is the only available information in
ReportingContext.
+ * ConnectionStatus only knows component id, component type is unknown.
+ * For example, there is no difference to tell if a connected
component is a funnel or a RemoteGroupPort.
+ */
+ private void processRemotePortEvent(AnalysisContext analysisContext,
NiFiFlow nifiFlow, ProvenanceEventRecord event, DataSetRefs analyzedRefs) {
+
+ final boolean isRemoteInputPort = "Remote Input
Port".equals(event.getComponentType());
+
+ // Create a RemoteInputPort Process.
+ // event.getComponentId returns UUID for RemoteGroupPort as a
client of S2S, and it's different from a remote port UUID (portDataSetid).
+ // See NIFI-4571 for detail.
+ final Referenceable remotePortDataSet = isRemoteInputPort ?
analyzedRefs.getOutputs().iterator().next() :
analyzedRefs.getInputs().iterator().next();
+ final String portProcessId = event.getComponentId();
+
+ final NiFiFlowPath remotePortProcess = new
NiFiFlowPath(portProcessId);
+ remotePortProcess.setName(event.getComponentType());
+ remotePortProcess.addProcessor(portProcessId);
+
+ // For RemoteInputPort, need to find the previous component
connected to this port,
+ // which passed this particular FlowFile.
+ // That is only possible by calling lineage API.
+ if (isRemoteInputPort) {
+ final ProvenanceEventRecord previousEvent =
findPreviousProvenanceEvent(analysisContext, event);
+ if (previousEvent == null) {
+ logger.warn("Previous event was not found: {}", new
Object[]{event});
+ return;
+ }
+
+ // Set groupId from incoming connection if available.
+ final List<ConnectionStatus> incomingRelationShips =
nifiFlow.getIncomingRelationShips(portProcessId);
--- End diff --
I think this is meant to get incoming connections, not relationships, right?
---