http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownDataSet.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownDataSet.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownDataSet.java new file mode 100644 index 0000000..42e407d --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownDataSet.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance.analyzer.unknown; + +import org.apache.atlas.typesystem.Referenceable; +import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer; +import org.apache.nifi.atlas.provenance.AnalysisContext; +import org.apache.nifi.provenance.ProvenanceEventRecord; + +import static org.apache.nifi.atlas.AtlasUtils.toQualifiedName; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_DESCRIPTION; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; + +public abstract class UnknownDataSet extends AbstractNiFiProvenanceEventAnalyzer { + + protected static final String TYPE = "nifi_data"; + + protected Referenceable createDataSetRef(AnalysisContext context, ProvenanceEventRecord event) { + final Referenceable ref = new Referenceable(TYPE); + ref.set(ATTR_NAME, event.getComponentType()); + ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(context.getNiFiClusterName(), event.getComponentId())); + ref.set(ATTR_DESCRIPTION, event.getEventType() + " was performed by " + event.getComponentType()); + return ref; + } + +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownInput.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownInput.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownInput.java new file mode 100644 index 0000000..f16908b --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownInput.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance.analyzer.unknown; + +import org.apache.atlas.typesystem.Referenceable; +import org.apache.nifi.atlas.provenance.AnalysisContext; +import org.apache.nifi.atlas.provenance.DataSetRefs; +import org.apache.nifi.provenance.ProvenanceEventRecord; + +public abstract class UnknownInput extends UnknownDataSet { + + @Override + public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event) { + + final String componentId = event.getComponentId(); + final DataSetRefs refs = new DataSetRefs(componentId); + final Referenceable ref = createDataSetRef(context, event); + refs.addInput(ref); + + return refs; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownOutput.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownOutput.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownOutput.java new file mode 100644 index 0000000..5d564c2 --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownOutput.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance.analyzer.unknown; + +import org.apache.atlas.typesystem.Referenceable; +import org.apache.nifi.atlas.provenance.AnalysisContext; +import org.apache.nifi.atlas.provenance.DataSetRefs; +import org.apache.nifi.provenance.ProvenanceEventRecord; + +public abstract class UnknownOutput extends UnknownDataSet { + + @Override + public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event) { + + final String componentId = event.getComponentId(); + final DataSetRefs refs = new DataSetRefs(componentId); + final Referenceable ref = createDataSetRef(context, event); + refs.addOutput(ref); + + return refs; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/AbstractLineageStrategy.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/AbstractLineageStrategy.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/AbstractLineageStrategy.java new file mode 100644 index 0000000..11d6e8b --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/AbstractLineageStrategy.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance.lineage; + +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.notification.hook.HookNotification; +import org.apache.atlas.typesystem.Referenceable; +import org.apache.nifi.atlas.NiFiFlow; +import org.apache.nifi.atlas.NiFiFlowPath; +import org.apache.nifi.atlas.provenance.AnalysisContext; +import org.apache.nifi.atlas.provenance.DataSetRefs; +import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer; +import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.nifi.atlas.AtlasUtils.toStr; +import static org.apache.nifi.atlas.AtlasUtils.toTypedQualifiedName; +import static org.apache.nifi.atlas.NiFiAtlasHook.NIFI_USER; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_NIFI_FLOW; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_URL; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH; + +public abstract class AbstractLineageStrategy implements LineageStrategy { + + protected Logger logger = LoggerFactory.getLogger(getClass()); + private LineageContext lineageContext; + + public void setLineageContext(LineageContext lineageContext) { + this.lineageContext = lineageContext; + } + + protected DataSetRefs executeAnalyzer(AnalysisContext analysisContext, ProvenanceEventRecord event) { + final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(event.getComponentType(), event.getTransitUri(), event.getEventType()); + if (analyzer == null) { + return null; + } + if (logger.isDebugEnabled()) { + logger.debug("Analyzer {} is found for event: {}", analyzer, event); + } + return analyzer.analyze(analysisContext, event); + } + + protected void addDataSetRefs(NiFiFlow nifiFlow, DataSetRefs refs) { + + final Set<NiFiFlowPath> flowPaths = refs.getComponentIds().stream() + .map(componentId -> { + final NiFiFlowPath flowPath = nifiFlow.findPath(componentId); + if (flowPath == null) { + logger.warn("FlowPath for {} was not found.", componentId); + } + return flowPath; + }) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + + addDataSetRefs(nifiFlow, flowPaths, refs); + } + + protected void addDataSetRefs(NiFiFlow nifiFlow, Set<NiFiFlowPath> flowPaths, DataSetRefs refs) { + // create reference to NiFi flow path. + final Referenceable flowRef = toReferenceable(nifiFlow); + final String clusterName = nifiFlow.getClusterName(); + final String url = nifiFlow.getUrl(); + + for (NiFiFlowPath flowPath : flowPaths) { + final Referenceable flowPathRef = toReferenceable(flowPath, flowRef, clusterName, url); + addDataSetRefs(refs, flowPathRef); + } + } + + private Referenceable toReferenceable(NiFiFlow nifiFlow) { + final Referenceable flowRef = new Referenceable(TYPE_NIFI_FLOW); + flowRef.set(ATTR_NAME, nifiFlow.getFlowName()); + flowRef.set(ATTR_QUALIFIED_NAME, nifiFlow.getQualifiedName()); + flowRef.set(ATTR_URL, nifiFlow.getUrl()); + return flowRef; + } + + protected Referenceable toReferenceable(NiFiFlowPath flowPath, NiFiFlow nifiFlow) { + return toReferenceable(flowPath, toReferenceable(nifiFlow), + nifiFlow.getClusterName(), nifiFlow.getUrl()); + } + + private Referenceable toReferenceable(NiFiFlowPath flowPath, Referenceable flowRef, String clusterName, String nifiUrl) { + final Referenceable flowPathRef = new Referenceable(TYPE_NIFI_FLOW_PATH); + flowPathRef.set(ATTR_NAME, flowPath.getName()); + flowPathRef.set(ATTR_QUALIFIED_NAME, flowPath.getId() + "@" + clusterName); + flowPathRef.set(ATTR_NIFI_FLOW, flowRef); + flowPathRef.set(ATTR_URL, flowPath.createDeepLinkURL(nifiUrl)); + // Referenceable has to have GUID assigned, otherwise it will not be stored due to lack of required attribute. + // If a Referencible has GUID, Atlas does not validate all required attributes. + flowPathRef.set(ATTR_INPUTS, flowPath.getInputs().stream().map(this::toReferenceable).collect(Collectors.toList())); + flowPathRef.set(ATTR_OUTPUTS, flowPath.getOutputs().stream().map(this::toReferenceable).collect(Collectors.toList())); + return flowPathRef; + } + + private Referenceable toReferenceable(AtlasObjectId id) { + return StringUtils.isEmpty(id.getGuid()) + ? new Referenceable(id.getTypeName(), id.getUniqueAttributes()) + : new Referenceable(id.getGuid(), id.getTypeName(), id.getUniqueAttributes()); + } + + protected void createEntity(Referenceable ... entities) { + final HookNotification.EntityCreateRequest msg = new HookNotification.EntityCreateRequest(NIFI_USER, entities); + lineageContext.addMessage(msg); + } + + @SuppressWarnings("unchecked") + private boolean addDataSetRefs(Set<Referenceable> refsToAdd, Referenceable nifiFlowPath, String targetAttribute) { + if (refsToAdd != null && !refsToAdd.isEmpty()) { + + // If nifiFlowPath already has a given dataSetRef, then it needs not to be created. + final Function<Referenceable, String> toTypedQualifiedName = ref -> toTypedQualifiedName(ref.getTypeName(), toStr(ref.get(ATTR_QUALIFIED_NAME))); + final Collection<Referenceable> refs = Optional.ofNullable((Collection<Referenceable>) nifiFlowPath.get(targetAttribute)).orElseGet(ArrayList::new); + final Set<String> existingRefTypedQualifiedNames = refs.stream().map(toTypedQualifiedName).collect(Collectors.toSet()); + + refsToAdd.stream().filter(ref -> !existingRefTypedQualifiedNames.contains(toTypedQualifiedName.apply(ref))) + .forEach(ref -> { + if (ref.getId().isUnassigned()) { + // Create new entity. + logger.debug("Found a new DataSet reference from {} to {}, sending an EntityCreateRequest", + new Object[]{toTypedQualifiedName.apply(nifiFlowPath), toTypedQualifiedName.apply(ref)}); + final HookNotification.EntityCreateRequest createDataSet = new HookNotification.EntityCreateRequest(NIFI_USER, ref); + lineageContext.addMessage(createDataSet); + } + refs.add(ref); + }); + + if (refs.size() > existingRefTypedQualifiedNames.size()) { + // Something has been added. + nifiFlowPath.set(targetAttribute, refs); + return true; + } + } + return false; + } + + protected void addDataSetRefs(DataSetRefs dataSetRefs, Referenceable flowPathRef) { + final boolean inputsAdded = addDataSetRefs(dataSetRefs.getInputs(), flowPathRef, ATTR_INPUTS); + final boolean outputsAdded = addDataSetRefs(dataSetRefs.getOutputs(), flowPathRef, ATTR_OUTPUTS); + if (inputsAdded || outputsAdded) { + lineageContext.addMessage(new HookNotification.EntityPartialUpdateRequest(NIFI_USER, TYPE_NIFI_FLOW_PATH, + ATTR_QUALIFIED_NAME, (String) flowPathRef.get(ATTR_QUALIFIED_NAME), flowPathRef)); + } + } + + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/CompleteFlowPathLineage.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/CompleteFlowPathLineage.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/CompleteFlowPathLineage.java new file mode 100644 index 0000000..4437bfc --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/CompleteFlowPathLineage.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance.lineage; + +import org.apache.atlas.typesystem.Referenceable; +import org.apache.nifi.atlas.NiFiFlow; +import org.apache.nifi.atlas.NiFiFlowPath; +import org.apache.nifi.atlas.provenance.AnalysisContext; +import org.apache.nifi.atlas.provenance.DataSetRefs; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.provenance.lineage.ComputeLineageResult; +import org.apache.nifi.provenance.lineage.LineageNode; +import org.apache.nifi.provenance.lineage.LineageNodeType; +import org.apache.nifi.util.Tuple; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.zip.CRC32; + +import static org.apache.nifi.atlas.AtlasUtils.toQualifiedName; +import static org.apache.nifi.atlas.AtlasUtils.toStr; +import static org.apache.nifi.atlas.AtlasUtils.toTypedQualifiedName; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE; +import static org.apache.nifi.provenance.ProvenanceEventType.DROP; + +public class CompleteFlowPathLineage extends AbstractLineageStrategy { + + @Override + public ProvenanceEventType[] getTargetEventTypes() { + return new ProvenanceEventType[]{DROP}; + } + + @Override + public void processEvent(AnalysisContext analysisContext, NiFiFlow nifiFlow, ProvenanceEventRecord event) { + if (!ProvenanceEventType.DROP.equals(event.getEventType())) { + return; + } + final ComputeLineageResult lineage = analysisContext.queryLineage(event.getEventId()); + + // Construct a tree model to traverse backwards. + final Map<String, List<LineageNode>> lineageTree = new HashMap<>(); + analyzeLineageTree(lineage, lineageTree); + + final LineagePath lineagePath = new LineagePath(); + extractLineagePaths(analysisContext, lineageTree, lineagePath, event); + + analyzeLineagePath(analysisContext, lineagePath); + + // Input and output data set are both required to report lineage. + List<Tuple<NiFiFlowPath, DataSetRefs>> createdFlowPaths = new ArrayList<>(); + if (lineagePath.isComplete()) { + createCompleteFlowPath(nifiFlow, lineagePath, createdFlowPaths); + for (Tuple<NiFiFlowPath, DataSetRefs> createdFlowPath : createdFlowPaths) { + final NiFiFlowPath flowPath = createdFlowPath.getKey(); + createEntity(toReferenceable(flowPath, nifiFlow)); + addDataSetRefs(nifiFlow, Collections.singleton(flowPath), createdFlowPath.getValue()); + } + createdFlowPaths.clear(); + } + } + + private List<LineageNode> findParentEvents(Map<String, List<LineageNode>> lineageTree, ProvenanceEventRecord event) { + List<LineageNode> parentNodes = lineageTree.get(String.valueOf(event.getEventId())); + return parentNodes == null || parentNodes.isEmpty() ? null : parentNodes.stream() + // In case it's not a provenance event (i.e. FLOWFILE_NODE), get one level higher parents. + .flatMap(n -> !LineageNodeType.PROVENANCE_EVENT_NODE.equals(n.getNodeType()) + ? lineageTree.get(n.getIdentifier()).stream() : Stream.of(n)) + .collect(Collectors.toList()); + } + + private void extractLineagePaths(AnalysisContext context, Map<String, List<LineageNode>> lineageTree, + LineagePath lineagePath, ProvenanceEventRecord lastEvent) { + + lineagePath.getEvents().add(lastEvent); + List<LineageNode> parentEvents = findParentEvents(lineageTree, lastEvent); + + final boolean createSeparateParentPath = lineagePath.shouldCreateSeparatePath(lastEvent.getEventType()); + + if (createSeparateParentPath && (parentEvents == null || parentEvents.isEmpty())) { + // Try expanding the lineage. + // This is for the FlowFiles those are FORKed (or JOINed ... etc) other FlowFile(s). + // FlowFiles routed to 'original' may have these event types, too, however they have parents fetched together. + + // For example, with these inputs: CREATE(F1), FORK (F1 -> F2, F3), DROP(F1), SEND (F2), SEND(F3), DROP(F2), DROP(F3) + // Then when DROP(F1) is queried, FORK(F1) and CREATE(F1) are returned. + // For DROP(F2), SEND(F2) and FORK(F2) are returned. + // For DROP(F3), SEND(F3) and FORK(F3) are returned. + // In this case, FORK(F2) and FORK(F3) have to query their parents again, to get CREATE(F1). + final ComputeLineageResult joinedParents = context.findParents(lastEvent.getEventId()); + analyzeLineageTree(joinedParents, lineageTree); + + parentEvents = findParentEvents(lineageTree, lastEvent); + } + + if (parentEvents == null || parentEvents.isEmpty()) { + logger.debug("{} does not have any parent, stop extracting lineage path.", lastEvent); + return; + } + + if (createSeparateParentPath) { + // Treat those as separated lineage_path + parentEvents.stream() + .map(parentEvent -> context.getProvenanceEvent(Long.parseLong(parentEvent.getIdentifier()))) + .filter(Objects::nonNull) + .forEach(parent -> { + final LineagePath parentPath = new LineagePath(); + lineagePath.getParents().add(parentPath); + extractLineagePaths(context, lineageTree, parentPath, parent); + }); + } else { + // Simply traverse upwards. + if (parentEvents.size() > 1) { + throw new IllegalStateException(String.format("Having more than 1 parents for event type %s" + + " is not expected. Should ask NiFi developer for investigation. %s", + lastEvent.getEventType(), lastEvent)); + } + final ProvenanceEventRecord parentEvent = context.getProvenanceEvent(Long.parseLong(parentEvents.get(0).getIdentifier())); + if (parentEvent != null) { + extractLineagePaths(context, lineageTree, lineagePath, parentEvent); + } + } + } + + private void analyzeLineagePath(AnalysisContext analysisContext, LineagePath lineagePath) { + final List<ProvenanceEventRecord> events = lineagePath.getEvents(); + + final DataSetRefs parentRefs = new DataSetRefs(events.get(0).getComponentId()); + events.forEach(event -> { + final DataSetRefs refs = executeAnalyzer(analysisContext, event); + if (refs == null || refs.isEmpty()) { + return; + } + refs.getInputs().forEach(parentRefs::addInput); + refs.getOutputs().forEach(parentRefs::addOutput); + }); + + lineagePath.setRefs(parentRefs); + + // Analyse parents. + lineagePath.getParents().forEach(parent -> analyzeLineagePath(analysisContext, parent)); + } + + private void analyzeLineageTree(ComputeLineageResult lineage, Map<String, List<LineageNode>> lineageTree) { + lineage.getEdges().forEach(edge -> lineageTree + .computeIfAbsent(edge.getDestination().getIdentifier(), k -> new ArrayList<>()) + .add(edge.getSource())); + } + + /** + * Create a new FlowPath from a LineagePath. FlowPaths created by this method will have a hash in its qualified name. + * + * <p>This method processes parents first to generate a hash, as parent LineagePath hashes contribute child hash + * in order to distinguish FlowPaths based on the complete path for a given FlowFile. + * For example, even if two lineagePaths have identical componentIds/inputs/outputs, + * if those parents have different inputs, those should be treated as different paths.</p> + * + * @param nifiFlow A reference to current NiFiFlow + * @param lineagePath LineagePath from which NiFiFlowPath and DataSet refs are created and added to the {@code createdFlowPaths}. + * @param createdFlowPaths A list to buffer created NiFiFlowPaths, + * in order to defer sending notification to Kafka until all parent FlowPath get analyzed. + */ + private void createCompleteFlowPath(NiFiFlow nifiFlow, LineagePath lineagePath, List<Tuple<NiFiFlowPath, DataSetRefs>> createdFlowPaths) { + + final List<ProvenanceEventRecord> events = lineagePath.getEvents(); + Collections.reverse(events); + + final List<String> componentIds = events.stream().map(ProvenanceEventRecord::getComponentId).collect(Collectors.toList()); + final String firstComponentId = events.get(0).getComponentId(); + final DataSetRefs dataSetRefs = lineagePath.getRefs(); + + // Process parents first. + Referenceable queueBetweenParent = null; + if (!lineagePath.getParents().isEmpty()) { + // Add queue between this lineage path and parent. + queueBetweenParent = new Referenceable(TYPE_NIFI_QUEUE); + // The first event knows why this lineage has parents, e.g. FORK or JOIN. + final String firstEventType = events.get(0).getEventType().name(); + queueBetweenParent.set(ATTR_NAME, firstEventType); + dataSetRefs.addInput(queueBetweenParent); + + for (LineagePath parent : lineagePath.getParents()) { + parent.getRefs().addOutput(queueBetweenParent); + createCompleteFlowPath(nifiFlow, parent, createdFlowPaths); + } + } + + // Create a variant path. + // Calculate a hash from component_ids and input and output resource ids. + final Stream<String> ioIds = Stream.concat(dataSetRefs.getInputs().stream(), dataSetRefs.getOutputs() + .stream()).map(ref -> toTypedQualifiedName(ref.getTypeName(), toStr(ref.get(ATTR_QUALIFIED_NAME)))); + + final Stream<String> parentHashes = lineagePath.getParents().stream().map(p -> String.valueOf(p.getLineagePathHash())); + final CRC32 crc32 = new CRC32(); + crc32.update(Stream.of(componentIds.stream(), ioIds, parentHashes).reduce(Stream::concat).orElseGet(Stream::empty) + .sorted().distinct() + .collect(Collectors.joining(",")).getBytes(StandardCharsets.UTF_8)); + + final long hash = crc32.getValue(); + lineagePath.setLineagePathHash(hash); + final NiFiFlowPath flowPath = new NiFiFlowPath(firstComponentId, hash); + + // In order to differentiate a queue between parents and this flow_path, add the hash into the queue qname. + // E.g, FF1 and FF2 read from dirA were merged, vs FF3 and FF4 read from dirB were merged then passed here, these two should be different queue. + if (queueBetweenParent != null) { + queueBetweenParent.set(ATTR_QUALIFIED_NAME, toQualifiedName(nifiFlow.getClusterName(), firstComponentId + "::" + hash)); + } + + // If the same components emitted multiple provenance events consecutively, merge it to come up with a simpler name. + String previousComponentId = null; + List<ProvenanceEventRecord> uniqueEventsForName = new ArrayList<>(); + for (ProvenanceEventRecord event : events) { + if (!event.getComponentId().equals(previousComponentId)) { + uniqueEventsForName.add(event); + } + previousComponentId = event.getComponentId(); + } + + final String pathName = uniqueEventsForName.stream() + // Processor name can be configured by user and more meaningful if available. + // If the component is already removed, it may not be available here. + .map(event -> nifiFlow.getProcessComponentName(event.getComponentId(), event::getComponentType)) + .collect(Collectors.joining(", ")); + + flowPath.setName(pathName); + final NiFiFlowPath staticFlowPath = nifiFlow.findPath(firstComponentId); + flowPath.setGroupId(staticFlowPath != null ? staticFlowPath.getGroupId() : nifiFlow.getRootProcessGroupId()); + + // To defer send notification until entire lineagePath analysis gets finished, just add the instance into a buffer. + createdFlowPaths.add(new Tuple<>(flowPath, dataSetRefs)); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/LineageContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/LineageContext.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/LineageContext.java new file mode 100644 index 0000000..060cfbe --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/LineageContext.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance.lineage; + +import org.apache.atlas.notification.hook.HookNotification; + +public interface LineageContext { + void addMessage(HookNotification.HookNotificationMessage message); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/LineagePath.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/LineagePath.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/LineagePath.java new file mode 100644 index 0000000..d1033f3 --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/LineagePath.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance.lineage; + +import org.apache.nifi.atlas.provenance.DataSetRefs; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; + +import java.util.ArrayList; +import java.util.List; + +public class LineagePath { + private List<ProvenanceEventRecord> events = new ArrayList<>(); + private List<LineagePath> parents = new ArrayList<>(); + private DataSetRefs refs; + private long lineagePathHash; + + /** + * NOTE: The list contains provenance events in reversed order, i.e. the last one first. + */ + public List<ProvenanceEventRecord> getEvents() { + return events; + } + + public List<LineagePath> getParents() { + return parents; + } + + public DataSetRefs getRefs() { + return refs; + } + + public void setRefs(DataSetRefs refs) { + this.refs = refs; + } + + public boolean shouldCreateSeparatePath(ProvenanceEventType eventType) { + switch (eventType) { + case CLONE: + case JOIN: + case FORK: + case REPLAY: + return true; + } + return false; + } + + public boolean isComplete() { + // If the FlowFile is DROPed right after create child FlowFile, then the path is not worth for reporting. + final boolean isDroppedImmediately = events.size() == 2 + && events.get(0).getEventType().equals(ProvenanceEventType.DROP) + && shouldCreateSeparatePath(events.get(1).getEventType()); + return !isDroppedImmediately && hasInput() && hasOutput(); + } + + public boolean hasInput() { + return (refs != null && !refs.getInputs().isEmpty()) || parents.stream().anyMatch(parent -> parent.hasInput()); + } + + public boolean hasOutput() { + return (refs != null && !refs.getOutputs().isEmpty()) || parents.stream().anyMatch(parent -> parent.hasOutput()); + } + + public long getLineagePathHash() { + return lineagePathHash; + } + + public void setLineagePathHash(long lineagePathHash) { + this.lineagePathHash = lineagePathHash; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/LineageStrategy.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/LineageStrategy.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/LineageStrategy.java new file mode 100644 index 0000000..bf1139b --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/LineageStrategy.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance.lineage; + +import org.apache.nifi.atlas.NiFiFlow; +import org.apache.nifi.atlas.provenance.AnalysisContext; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; + +public interface LineageStrategy { + + default ProvenanceEventType[] getTargetEventTypes(){ + return new ProvenanceEventType[0]; + } + + void setLineageContext(LineageContext lineageContext); + + void processEvent(AnalysisContext analysisContext, NiFiFlow nifiFlow, ProvenanceEventRecord event); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/SimpleFlowPathLineage.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/SimpleFlowPathLineage.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/SimpleFlowPathLineage.java new file mode 100644 index 0000000..7ecbed5 --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/SimpleFlowPathLineage.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance.lineage; + +import org.apache.atlas.typesystem.Referenceable; +import org.apache.nifi.atlas.NiFiFlow; +import org.apache.nifi.atlas.NiFiFlowPath; +import org.apache.nifi.atlas.provenance.AnalysisContext; +import org.apache.nifi.atlas.provenance.DataSetRefs; +import org.apache.nifi.controller.status.ConnectionStatus; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.lineage.ComputeLineageResult; +import org.apache.nifi.provenance.lineage.LineageEdge; +import org.apache.nifi.provenance.lineage.LineageNode; +import org.apache.nifi.provenance.lineage.LineageNodeType; + +import java.util.List; + +import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE; + +public class SimpleFlowPathLineage extends AbstractLineageStrategy { + + @Override + public void processEvent(AnalysisContext analysisContext, NiFiFlow nifiFlow, ProvenanceEventRecord event) { + final DataSetRefs refs = executeAnalyzer(analysisContext, event); + if (refs == null || (refs.isEmpty())) { + return; + } + + if ("Remote Input Port".equals(event.getComponentType()) || "Remote Output Port".equals(event.getComponentType())) { + processRemotePortEvent(analysisContext, nifiFlow, event, refs); + } else { + addDataSetRefs(nifiFlow, refs); + } + + } + + /** + * Create a flow_path entity corresponding to the target RemoteGroupPort when a SEND/RECEIVE event are received. + * Because such entity can not be created in advance while analyzing flow statically, + * as ReportingTask can not determine whether a component id is a RemoteGroupPort, + * since connectionStatus is the only available information in ReportingContext. + * ConnectionStatus only knows component id, component type is unknown. + * For example, there is no difference to tell if a connected component is a funnel or a RemoteGroupPort. + */ + private void processRemotePortEvent(AnalysisContext analysisContext, NiFiFlow nifiFlow, ProvenanceEventRecord event, DataSetRefs analyzedRefs) { + + final boolean isRemoteInputPort = "Remote Input Port".equals(event.getComponentType()); + + // Create a RemoteInputPort Process. + // event.getComponentId returns UUID for RemoteGroupPort as a client of S2S, and it's different from a remote port UUID (portDataSetid). + // See NIFI-4571 for detail. + final Referenceable remotePortDataSet = isRemoteInputPort ? analyzedRefs.getOutputs().iterator().next() : analyzedRefs.getInputs().iterator().next(); + final String portProcessId = event.getComponentId(); + + final NiFiFlowPath remotePortProcess = new NiFiFlowPath(portProcessId); + remotePortProcess.setName(event.getComponentType()); + remotePortProcess.addProcessor(portProcessId); + + // For RemoteInputPort, need to find the previous component connected to this port, + // which passed this particular FlowFile. + // That is only possible by calling lineage API. + if (isRemoteInputPort) { + final ProvenanceEventRecord previousEvent = findPreviousProvenanceEvent(analysisContext, event); + if (previousEvent == null) { + logger.warn("Previous event was not found: {}", new Object[]{event}); + return; + } + + // Set groupId from incoming connection if available. + final List<ConnectionStatus> incomingConnections = nifiFlow.getIncomingConnections(portProcessId); + if (incomingConnections == null || incomingConnections.isEmpty()) { + logger.warn("Incoming relationship was not found: {}", new Object[]{event}); + return; + } + + final ConnectionStatus connection = incomingConnections.get(0); + remotePortProcess.setGroupId(connection.getGroupId()); + + final Referenceable remotePortProcessRef = toReferenceable(remotePortProcess, nifiFlow); + createEntity(remotePortProcessRef); + + // Create a queue. + Referenceable queueFromStaticFlowPathToRemotePortProcess = new Referenceable(TYPE_NIFI_QUEUE); + queueFromStaticFlowPathToRemotePortProcess.set(ATTR_NAME, "queue"); + queueFromStaticFlowPathToRemotePortProcess.set(ATTR_QUALIFIED_NAME, nifiFlow.toQualifiedName(portProcessId)); + + // Create lineage: Static flow_path -> queue + DataSetRefs staticFlowPathRefs = new DataSetRefs(previousEvent.getComponentId()); + staticFlowPathRefs.addOutput(queueFromStaticFlowPathToRemotePortProcess); + addDataSetRefs(nifiFlow, staticFlowPathRefs); + + + // Create lineage: Queue -> RemoteInputPort process -> RemoteInputPort dataSet + DataSetRefs remotePortRefs = new DataSetRefs(portProcessId); + remotePortRefs.addInput(queueFromStaticFlowPathToRemotePortProcess); + remotePortRefs.addOutput(remotePortDataSet); + addDataSetRefs(remotePortRefs, remotePortProcessRef); + + } else { + // For RemoteOutputPort, it's possible that multiple processors are connected. + // In that case, the received FlowFile is cloned and passed to each connection. + // So we need to create multiple DataSetRefs. + final List<ConnectionStatus> connections = nifiFlow.getOutgoingConnections(portProcessId); + if (connections == null || connections.isEmpty()) { + logger.warn("Incoming connection was not found: {}", new Object[]{event}); + return; + } + + // Set group id from outgoing connection if available. + remotePortProcess.setGroupId(connections.get(0).getGroupId()); + + final Referenceable remotePortProcessRef = toReferenceable(remotePortProcess, nifiFlow); + createEntity(remotePortProcessRef); + + // Create lineage: RemoteOutputPort dataSet -> RemoteOutputPort process + DataSetRefs remotePortRefs = new DataSetRefs(portProcessId); + remotePortRefs.addInput(remotePortDataSet); + addDataSetRefs(remotePortRefs, remotePortProcessRef); + + for (ConnectionStatus connection : connections) { + final String destinationId = connection.getDestinationId(); + final NiFiFlowPath destFlowPath = nifiFlow.findPath(destinationId); + if (destFlowPath == null) { + // If the destination of a connection is a Remote Input Port, + // then its corresponding flow path may not be created yet. + // In such direct RemoteOutputPort to RemoteInputPort case, do not add a queue from this RemoteOutputPort + // as a queue will be created by the connected RemoteInputPort to connect this RemoteOutputPort. + continue; + } + + // Create a queue. + Referenceable queueFromRemotePortProcessToStaticFlowPath = new Referenceable(TYPE_NIFI_QUEUE); + queueFromRemotePortProcessToStaticFlowPath.set(ATTR_NAME, "queue"); + queueFromRemotePortProcessToStaticFlowPath.set(ATTR_QUALIFIED_NAME, nifiFlow.toQualifiedName(destinationId)); + + // Create lineage: Queue -> Static flow_path + DataSetRefs staticFlowPathRefs = new DataSetRefs(destinationId); + staticFlowPathRefs.addInput(queueFromRemotePortProcessToStaticFlowPath); + addDataSetRefs(nifiFlow, staticFlowPathRefs); + + // Create lineage: RemoteOutputPort dataSet -> RemoteOutputPort process -> Queue + remotePortRefs.addOutput(queueFromRemotePortProcessToStaticFlowPath); + addDataSetRefs(remotePortRefs, remotePortProcessRef); + } + + // Add RemoteOutputPort process, so that it can be found even if it is connected to RemoteInputPort directory without any processor in between. + nifiFlow.getFlowPaths().put(remotePortProcess.getId(), remotePortProcess); + + } + + } + + private ProvenanceEventRecord findPreviousProvenanceEvent(AnalysisContext context, ProvenanceEventRecord event) { + final ComputeLineageResult lineage = context.queryLineage(event.getEventId()); + if (lineage == null) { + logger.warn("Lineage was not found: {}", new Object[]{event}); + return null; + } + + // If no previous provenance node found due to expired or other reasons, just log a warning msg and do nothing. + final LineageNode previousProvenanceNode = traverseLineage(lineage, String.valueOf(event.getEventId())); + if (previousProvenanceNode == null) { + logger.warn("Traverse lineage could not find any preceding provenance event node: {}", new Object[]{event}); + return null; + } + + final long previousEventId = Long.parseLong(previousProvenanceNode.getIdentifier()); + return context.getProvenanceEvent(previousEventId); + } + + /** + * Recursively traverse lineage graph until a preceding provenance event is found. + */ + private LineageNode traverseLineage(ComputeLineageResult lineage, String eventId) { + final LineageNode previousNode = lineage.getEdges().stream() + .filter(edge -> edge.getDestination().getIdentifier().equals(String.valueOf(eventId))) + .findFirst().map(LineageEdge::getSource).orElse(null); + if (previousNode == null) { + return null; + } + if (previousNode.getNodeType().equals(LineageNodeType.PROVENANCE_EVENT_NODE)) { + return previousNode; + } + return traverseLineage(lineage, previousNode.getIdentifier()); + } + + +}
