Repository: nifi Updated Branches: refs/heads/master ed9263811 -> 79a7014a9
NIFI-4971: ReportLineageToAtlas complete path can miss one-time lineages - Separated Hook message de-duplication logic from NiFiAtlasHook to NotificationSender - NiFiAtlasHook used to send individual CREATE_ENTITY messages for each entities, this commit changed it to bundle all new entities into a single CREATE_ENTITY to preserve entity creation order, so that new DataSet entities can be referred from new nifi_flow_path entities - Added more unit tests This closes #2542 Signed-off-by: Mike Thomsen <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/79a7014a Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/79a7014a Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/79a7014a Branch: refs/heads/master Commit: 79a7014a95dc3087f88248c732fb1e4ad8e6e128 Parents: ed92638 Author: Koji Kawamura <[email protected]> Authored: Fri Mar 9 09:27:20 2018 +0900 Committer: Mike Thomsen <[email protected]> Committed: Tue May 15 05:50:26 2018 -0400 ---------------------------------------------------------------------- .../org/apache/nifi/atlas/NiFiAtlasHook.java | 300 ------------- .../apache/nifi/atlas/hook/NiFiAtlasHook.java | 68 +++ .../nifi/atlas/hook/NotificationSender.java | 392 +++++++++++++++++ .../lineage/AbstractLineageStrategy.java | 4 +- .../lineage/CompleteFlowPathLineage.java | 32 +- .../atlas/reporting/ReportLineageToAtlas.java | 2 +- .../nifi/atlas/hook/TestNotificationSender.java | 417 +++++++++++++++++++ 7 files changed, 911 insertions(+), 304 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/79a7014a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java deleted file mode 100644 index 58945d5..0000000 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java +++ /dev/null @@ -1,300 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.atlas; - -import com.sun.jersey.api.client.ClientResponse; -import org.apache.atlas.AtlasServiceException; -import org.apache.atlas.hook.AtlasHook; -import org.apache.atlas.model.instance.AtlasEntity; -import org.apache.atlas.model.instance.AtlasObjectId; -import org.apache.atlas.notification.hook.HookNotification.EntityPartialUpdateRequest; -import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage; -import org.apache.atlas.typesystem.Referenceable; -import org.apache.atlas.typesystem.persistence.Id; -import org.apache.nifi.atlas.provenance.lineage.LineageContext; -import org.apache.nifi.util.Tuple; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.stream.Collectors; - -import static org.apache.atlas.notification.hook.HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE; -import static org.apache.nifi.atlas.AtlasUtils.toTypedQualifiedName; -import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID; -import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS; -import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS; -import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; -import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME; -import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH; - -/** - * This class is not thread-safe as it holds uncommitted notification messages within instance. - * {@link #addMessage(HookNotificationMessage)} and {@link #commitMessages()} should be used serially from a single thread. - */ -public class NiFiAtlasHook extends AtlasHook implements LineageContext { - - public static final String NIFI_USER = "nifi"; - - private static final Logger logger = LoggerFactory.getLogger(NiFiAtlasHook.class); - private static final String CONF_PREFIX = "atlas.hook.nifi."; - private static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries"; - - private NiFiAtlasClient atlasClient; - - /** - * An index to resolve a qualifiedName from a GUID. - */ - private final Map<String, String> guidToQualifiedName; - /** - * An index to resolve a Referenceable from a typeName::qualifiedName. - */ - private final Map<String, Referenceable> typedQualifiedNameToRef; - - - private static <K, V> Map<K, V> createCache(final int maxSize) { - return new LinkedHashMap<K, V>(maxSize, 0.75f, true) { - @Override - protected boolean removeEldestEntry(Map.Entry<K, V> eldest) { - return size() > maxSize; - } - }; - } - - public NiFiAtlasHook() { - final int qualifiedNameCacheSize = 10_000; - this.guidToQualifiedName = createCache(qualifiedNameCacheSize); - - final int dataSetRefCacheSize = 1_000; - this.typedQualifiedNameToRef = createCache(dataSetRefCacheSize); - } - - public void setAtlasClient(NiFiAtlasClient atlasClient) { - this.atlasClient = atlasClient; - } - - @Override - protected String getNumberOfRetriesPropertyKey() { - return HOOK_NUM_RETRIES; - } - - - private final List<HookNotificationMessage> messages = new ArrayList<>(); - - @Override - public void addMessage(HookNotificationMessage message) { - messages.add(message); - } - - private class Metrics { - final long startedAt = System.currentTimeMillis(); - int totalMessages; - int partialNiFiFlowPathUpdates; - int dedupedPartialNiFiFlowPathUpdates; - int otherMessages; - int flowPathSearched; - int dataSetSearched; - int dataSetCacheHit; - private void log(String message) { - logger.debug(String.format("%s, %d ms passed, totalMessages=%d," + - " partialNiFiFlowPathUpdates=%d, dedupedPartialNiFiFlowPathUpdates=%d, otherMessage=%d," + - " flowPathSearched=%d, dataSetSearched=%d, dataSetCacheHit=%s," + - " guidToQualifiedName.size=%d, typedQualifiedNameToRef.size=%d", - message, System.currentTimeMillis() - startedAt, totalMessages, - partialNiFiFlowPathUpdates, dedupedPartialNiFiFlowPathUpdates, otherMessages, - flowPathSearched, dataSetSearched, dataSetCacheHit, - guidToQualifiedName.size(), typedQualifiedNameToRef.size())); - } - } - - public void commitMessages() { - final Map<Boolean, List<HookNotificationMessage>> partialNiFiFlowPathUpdateAndOthers - = messages.stream().collect(Collectors.groupingBy(msg - -> ENTITY_PARTIAL_UPDATE.equals(msg.getType()) - && TYPE_NIFI_FLOW_PATH.equals(((EntityPartialUpdateRequest)msg).getTypeName()) - && ATTR_QUALIFIED_NAME.equals(((EntityPartialUpdateRequest)msg).getAttribute()) - )); - - - final List<HookNotificationMessage> otherMessages = partialNiFiFlowPathUpdateAndOthers.computeIfAbsent(false, k -> Collections.emptyList()); - final List<HookNotificationMessage> partialNiFiFlowPathUpdates = partialNiFiFlowPathUpdateAndOthers.computeIfAbsent(true, k -> Collections.emptyList()); - logger.info("Commit messages: {} partialNiFiFlowPathUpdate and {} other messages.", partialNiFiFlowPathUpdates.size(), otherMessages.size()); - - final Metrics metrics = new Metrics(); - metrics.totalMessages = messages.size(); - metrics.partialNiFiFlowPathUpdates = partialNiFiFlowPathUpdates.size(); - metrics.otherMessages = otherMessages.size(); - - try { - // Notify other messages first. - notifyEntities(otherMessages); - - // De-duplicate messages. - final List<HookNotificationMessage> deduplicatedMessages = partialNiFiFlowPathUpdates.stream().map(msg -> (EntityPartialUpdateRequest) msg) - // Group by nifi_flow_path qualifiedName value. - .collect(Collectors.groupingBy(EntityPartialUpdateRequest::getAttributeValue)).entrySet().stream() - .map(entry -> { - final String flowPathQualifiedName = entry.getKey(); - final Map<String, Referenceable> distinctInputs; - final Map<String, Referenceable> distinctOutputs; - final String flowPathGuid; - try { - // Fetch existing nifi_flow_path and its inputs/ouputs. - metrics.flowPathSearched++; - final AtlasEntity.AtlasEntityWithExtInfo flowPathExt - = atlasClient.searchEntityDef(new AtlasObjectId(TYPE_NIFI_FLOW_PATH, ATTR_QUALIFIED_NAME, flowPathQualifiedName)); - final AtlasEntity flowPathEntity = flowPathExt.getEntity(); - flowPathGuid = flowPathEntity.getGuid(); - distinctInputs = toReferenceables(flowPathEntity.getAttribute(ATTR_INPUTS), metrics); - distinctOutputs = toReferenceables(flowPathEntity.getAttribute(ATTR_OUTPUTS), metrics); - - } catch (AtlasServiceException e) { - if (ClientResponse.Status.NOT_FOUND.equals(e.getStatus())) { - logger.debug("nifi_flow_path was not found for qualifiedName {}", flowPathQualifiedName); - } else { - logger.warn("Failed to retrieve nifi_flow_path with qualifiedName {} due to {}", flowPathQualifiedName, e, e); - } - return null; - } - - // Merge all inputs and outputs for this nifi_flow_path. - for (EntityPartialUpdateRequest msg : entry.getValue()) { - fromReferenceable(msg.getEntity().get(ATTR_INPUTS), metrics) - .entrySet().stream().filter(ref -> !distinctInputs.containsKey(ref.getKey())) - .forEach(ref -> distinctInputs.put(ref.getKey(), ref.getValue())); - - fromReferenceable(msg.getEntity().get(ATTR_OUTPUTS), metrics) - .entrySet().stream().filter(ref -> !distinctOutputs.containsKey(ref.getKey())) - .forEach(ref -> distinctOutputs.put(ref.getKey(), ref.getValue())); - } - - // Consolidate messages into one. - final Referenceable flowPathRef = new Referenceable(flowPathGuid, TYPE_NIFI_FLOW_PATH, null); - // NOTE: distinctInputs.values() returns HashMap$Values, which causes following error. To avoid that, wrap with ArrayList: - // org.json4s.package$MappingException: Can't find ScalaSig for class org.apache.atlas.typesystem.Referenceable - flowPathRef.set(ATTR_INPUTS, new ArrayList<>(distinctInputs.values())); - flowPathRef.set(ATTR_OUTPUTS, new ArrayList<>(distinctOutputs.values())); - return new EntityPartialUpdateRequest(NIFI_USER, TYPE_NIFI_FLOW_PATH, - ATTR_QUALIFIED_NAME, flowPathQualifiedName, flowPathRef); - }) - .filter(Objects::nonNull) - .collect(Collectors.toList()); - - metrics.dedupedPartialNiFiFlowPathUpdates = deduplicatedMessages.size(); - notifyEntities(deduplicatedMessages); - } finally { - metrics.log("Committed"); - messages.clear(); - } - } - - /** - * <p>Convert nifi_flow_path inputs or outputs to a map of Referenceable keyed by qualifiedName.</p> - * <p>Atlas removes existing references those are not specified when a collection attribute is updated. - * In order to preserve existing DataSet references, existing elements should be passed within a partial update message.</p> - * <p>This method also populates entity cache for subsequent lookups.</p> - * @param _refs Contains references from an existin nifi_flow_path entity inputs or outputs attribute. - * @return A map of Referenceables keyed by qualifiedName. - */ - @SuppressWarnings("unchecked") - private Map<String, Referenceable> toReferenceables(Object _refs, Metrics metrics) { - if (_refs == null) { - // NOTE: This empty map may be used to add new Referenceables. Can not be Collection.emptyMap which does not support addition. - return new HashMap<>(); - } - - final List<Map<String, Object>> refs = (List<Map<String, Object>>) _refs; - return refs.stream().map(ref -> { - // Existing reference should has a GUID. - final String typeName = (String) ref.get(ATTR_TYPENAME); - final String guid = (String) ref.get(ATTR_GUID); - - if (guidToQualifiedName.containsKey(guid)) { - metrics.dataSetCacheHit++; - } - - final String refQualifiedName = guidToQualifiedName.computeIfAbsent(guid, k -> { - try { - metrics.dataSetSearched++; - final AtlasEntity.AtlasEntityWithExtInfo refExt = atlasClient.searchEntityDef(new AtlasObjectId(guid, typeName)); - final String qualifiedName = (String) refExt.getEntity().getAttribute(ATTR_QUALIFIED_NAME); - typedQualifiedNameToRef.put(toTypedQualifiedName(typeName, qualifiedName), new Referenceable(guid, typeName, Collections.EMPTY_MAP)); - return qualifiedName; - } catch (AtlasServiceException e) { - if (ClientResponse.Status.NOT_FOUND.equals(e.getStatus())) { - logger.warn("{} entity was not found for guid {}", typeName, guid); - } else { - logger.warn("Failed to retrieve {} with guid {} due to {}", typeName, guid, e); - } - return null; - } - }); - - if (refQualifiedName == null) { - return null; - } - return new Tuple<>(refQualifiedName, typedQualifiedNameToRef.get(toTypedQualifiedName(typeName, refQualifiedName))); - }).filter(Objects::nonNull).filter(tuple -> tuple.getValue() != null) - // If duplication happens, use new value. - .collect(Collectors.toMap(Tuple::getKey, Tuple::getValue, (oldValue, newValue) -> { - logger.warn("Duplicated qualified name was found, use the new one. oldValue={}, newValue={}", new Object[]{oldValue, newValue}); - return newValue; - })); - } - - @SuppressWarnings("unchecked") - private Map<String, Referenceable> fromReferenceable(Object _refs, Metrics metrics) { - if (_refs == null) { - return Collections.emptyMap(); - } - - final List<Referenceable> refs = (List<Referenceable>) _refs; - return refs.stream().map(ref -> { - // This ref is created within this reporting cycle, and it may not have GUID assigned yet, if it is a brand new reference. - // If cache has the Reference, then use it because instances in the cache are guaranteed to have GUID assigned. - // Brand new Referenceables have to have all mandatory attributes. - final String typeName = ref.getTypeName(); - final Id id = ref.getId(); - final String refQualifiedName = (String) ref.get(ATTR_QUALIFIED_NAME); - final String typedRefQualifiedName = toTypedQualifiedName(typeName, refQualifiedName); - - final Referenceable refFromCacheIfAvailable = typedQualifiedNameToRef.computeIfAbsent(typedRefQualifiedName, k -> { - if (id.isAssigned()) { - // If this referenceable has Guid assigned, then add this one to cache. - guidToQualifiedName.put(id._getId(), refQualifiedName); - typedQualifiedNameToRef.put(typedRefQualifiedName, ref); - } - return ref; - }); - - return new Tuple<>(refQualifiedName, refFromCacheIfAvailable); - }).filter(Objects::nonNull).filter(tuple -> tuple.getValue() != null) - .collect(Collectors.toMap(Tuple::getKey, Tuple::getValue)); - } - - public void close() { - if (notificationInterface != null) { - notificationInterface.close(); - } - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/79a7014a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NiFiAtlasHook.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NiFiAtlasHook.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NiFiAtlasHook.java new file mode 100644 index 0000000..4916337 --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NiFiAtlasHook.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.hook; + +import org.apache.atlas.hook.AtlasHook; +import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage; +import org.apache.nifi.atlas.NiFiAtlasClient; +import org.apache.nifi.atlas.provenance.lineage.LineageContext; + +import java.util.ArrayList; +import java.util.List; + +/** + * This class is not thread-safe as it holds uncommitted notification messages within instance. + * {@link #addMessage(HookNotificationMessage)} and {@link #commitMessages()} should be used serially from a single thread. + */ +public class NiFiAtlasHook extends AtlasHook implements LineageContext { + + public static final String NIFI_USER = "nifi"; + + private static final String CONF_PREFIX = "atlas.hook.nifi."; + private static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries"; + + private NiFiAtlasClient atlasClient; + + public void setAtlasClient(NiFiAtlasClient atlasClient) { + this.atlasClient = atlasClient; + } + + @Override + protected String getNumberOfRetriesPropertyKey() { + return HOOK_NUM_RETRIES; + } + + + private final List<HookNotificationMessage> messages = new ArrayList<>(); + + @Override + public void addMessage(HookNotificationMessage message) { + messages.add(message); + } + + public void commitMessages() { + final NotificationSender notificationSender = new NotificationSender(); + notificationSender.setAtlasClient(atlasClient); + notificationSender.send(messages, this::notifyEntities); + } + + public void close() { + if (notificationInterface != null) { + notificationInterface.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/79a7014a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NotificationSender.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NotificationSender.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NotificationSender.java new file mode 100644 index 0000000..4d599f1 --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NotificationSender.java @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.hook; + +import com.sun.jersey.api.client.ClientResponse; +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.notification.hook.HookNotification; +import org.apache.atlas.typesystem.Referenceable; +import org.apache.atlas.typesystem.persistence.Id; +import org.apache.nifi.atlas.AtlasUtils; +import org.apache.nifi.atlas.NiFiAtlasClient; +import org.apache.nifi.util.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static java.util.stream.Collectors.groupingBy; +import static java.util.stream.Collectors.toMap; +import static org.apache.atlas.notification.hook.HookNotification.HookNotificationType.ENTITY_CREATE; +import static org.apache.atlas.notification.hook.HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE; +import static org.apache.nifi.atlas.AtlasUtils.toTypedQualifiedName; +import static org.apache.nifi.atlas.hook.NiFiAtlasHook.NIFI_USER; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH; + +/** + * This class implements Atlas hook notification message deduplication mechanism. + * Separated from {@link NiFiAtlasHook} for better testability. + */ +class NotificationSender { + + private static final Logger logger = LoggerFactory.getLogger(NotificationSender.class); + + private NiFiAtlasClient atlasClient; + + /** + * An index to resolve a qualifiedName from a GUID. + */ + private final Map<String, String> guidToQualifiedName; + /** + * An index to resolve a Referenceable from a typeName::qualifiedName. + */ + private final Map<String, Referenceable> typedQualifiedNameToRef; + + private static <K, V> Map<K, V> createCache(final int maxSize) { + return new LinkedHashMap<K, V>(maxSize, 0.75f, true) { + @Override + protected boolean removeEldestEntry(Map.Entry<K, V> eldest) { + return size() > maxSize; + } + }; + } + + NotificationSender() { + final int qualifiedNameCacheSize = 10_000; + this.guidToQualifiedName = createCache(qualifiedNameCacheSize); + + final int dataSetRefCacheSize = 1_000; + this.typedQualifiedNameToRef = createCache(dataSetRefCacheSize); + } + + private class Metrics { + final long startedAt = System.currentTimeMillis(); + + /** + * The total number of messages passed to commitMessages. + */ + int totalMessages; + /** + * The number of CreateEntityRequest messages before de-duplication. + */ + int createMessages; + /** + * The number of unique CreateEntityRequest messages for 'nifi_flow_path'. + */ + int uniqueFlowPathCreates; + /** + * The number of unique CreateEntityRequest messages except 'nifi_flow_path'. + */ + int uniqueOtherCreates; + int partialNiFiFlowPathUpdates; + int uniquePartialNiFiFlowPathUpdates; + int otherMessages; + int flowPathSearched; + int dataSetSearched; + int dataSetCacheHit; + + private String toLogString(String message) { + return String.format("%s, %d ms passed, totalMessages=%d," + + " createMessages=%d, uniqueFlowPathCreates=%d, uniqueOtherCreates=%d," + + " partialNiFiFlowPathUpdates=%d, uniquePartialNiFiFlowPathUpdates=%d, otherMessage=%d," + + " flowPathSearched=%d, dataSetSearched=%d, dataSetCacheHit=%s," + + " guidToQualifiedName.size=%d, typedQualifiedNameToRef.size=%d", + message, System.currentTimeMillis() - startedAt, totalMessages, + createMessages, uniqueFlowPathCreates, uniqueOtherCreates, + partialNiFiFlowPathUpdates, uniquePartialNiFiFlowPathUpdates, otherMessages, + flowPathSearched, dataSetSearched, dataSetCacheHit, + guidToQualifiedName.size(), typedQualifiedNameToRef.size()); + } + } + + void setAtlasClient(NiFiAtlasClient atlasClient) { + this.atlasClient = atlasClient; + } + + private Predicate<Referenceable> distinctReferenceable() { + final Set<String> keys = new HashSet<>(); + return r -> { + final String key = AtlasUtils.toTypedQualifiedName(r.getTypeName(), (String) r.get(ATTR_QUALIFIED_NAME)); + return keys.add(key); + }; + } + + private <K, V> List<V> safeGet(Map<K, List<V>> map, K key) { + return map.computeIfAbsent(key, k -> Collections.emptyList()); + } + + @SuppressWarnings("unchecked") + private void mergeRefs(Referenceable r1, Referenceable r2) { + r1.set(ATTR_INPUTS, mergeRefs((Collection<Referenceable>) r1.get(ATTR_INPUTS), (Collection<Referenceable>) r2.get(ATTR_INPUTS))); + r1.set(ATTR_OUTPUTS, mergeRefs((Collection<Referenceable>) r1.get(ATTR_OUTPUTS), (Collection<Referenceable>) r2.get(ATTR_OUTPUTS))); + } + + private Collection<Referenceable> mergeRefs(Collection<Referenceable> r1, Collection<Referenceable> r2) { + final boolean isR1Empty = r1 == null || r1.isEmpty(); + final boolean isR2Empty = r2 == null || r2.isEmpty(); + + if (isR1Empty) { + // r2 may or may not have entities, don't have to merge r1. + return r2; + } else if (isR2Empty) { + // r1 has some entities, don't have to merge r2. + return r1; + } + + // If both have entities, then need to be merged. + return Stream.concat(r1.stream(), r2.stream()).filter(distinctReferenceable()).collect(Collectors.toList()); + } + + /** + * <p>Send hook notification messages. + * In order to notify relationships between 'nifi_flow_path' and its inputs/outputs, this method sends messages in following order:</p> + * <ol> + * <li>As a a single {@link org.apache.atlas.notification.hook.HookNotification.EntityCreateRequest} message: + * <ul> + * <li>New entities except 'nifi_flow_path', including DataSets such as 'nifi_queue', 'kafka_topic' or 'hive_table' ... etc, + * so that 'nifi_flow_path' can refer</li> + * <li>New 'nifi_flow_path' entities, entities order is guaranteed in a single message</li> + * </ul> + * </li> + * <li>Update 'nifi_flow_path' messages, before notifying update messages, this method fetches existing 'nifi_flow_path' entity + * to merge new inputs/outputs element with existing ones, so that existing ones will not be removed.</li> + * <li>Other messages except</li> + * </ol> + * <p>Messages having the same type and qualified name will be de-duplicated before being sent.</p> + * @param messages list of messages to be sent + * @param notifier responsible for sending notification messages, its accept method can be called multiple times + */ + void send(final List<HookNotification.HookNotificationMessage> messages, final Consumer<List<HookNotification.HookNotificationMessage>> notifier) { + final Metrics metrics = new Metrics(); + try { + metrics.totalMessages = messages.size(); + + final Map<Boolean, List<HookNotification.HookNotificationMessage>> createAndOthers = messages.stream().collect(groupingBy(msg -> ENTITY_CREATE.equals(msg.getType()))); + + final List<HookNotification.HookNotificationMessage> creates = safeGet(createAndOthers, true); + metrics.createMessages = creates.size(); + + final Map<Boolean, List<Referenceable>> newFlowPathsAndOtherEntities = creates.stream() + .flatMap(msg -> ((HookNotification.EntityCreateRequest) msg).getEntities().stream()) + .collect(groupingBy(ref -> TYPE_NIFI_FLOW_PATH.equals(ref.typeName))); + + // Deduplicate same entity creation messages. + final List<Referenceable> newEntitiesExceptFlowPaths = safeGet(newFlowPathsAndOtherEntities, false) + .stream().filter(distinctReferenceable()).collect(Collectors.toList()); + + // Deduplicate same flow paths and also merge inputs and outputs + final Collection<Referenceable> newFlowPaths = safeGet(newFlowPathsAndOtherEntities, true).stream() + .collect(toMap(ref -> ref.get(ATTR_QUALIFIED_NAME), ref -> ref, (r1, r2) -> { + // Merge inputs and outputs. + mergeRefs(r1, r2); + return r1; + })).values(); + metrics.uniqueFlowPathCreates = newFlowPaths.size(); + metrics.uniqueOtherCreates = newEntitiesExceptFlowPaths.size(); + + + // 1-1. Notify new entities except 'nifi_flow_path' + // 1-2. Notify new 'nifi_flow_path' + List<Referenceable> newEntities = new ArrayList<>(); + newEntities.addAll(newEntitiesExceptFlowPaths); + newEntities.addAll(newFlowPaths); + if (!newEntities.isEmpty()) { + notifier.accept(Collections.singletonList(new HookNotification.EntityCreateRequest(NIFI_USER, newEntities))); + } + + final Map<Boolean, List<HookNotification.HookNotificationMessage>> partialNiFiFlowPathUpdateAndOthers + = safeGet(createAndOthers, false).stream().collect(groupingBy(msg + -> ENTITY_PARTIAL_UPDATE.equals(msg.getType()) + && TYPE_NIFI_FLOW_PATH.equals(((HookNotification.EntityPartialUpdateRequest)msg).getTypeName()) + && ATTR_QUALIFIED_NAME.equals(((HookNotification.EntityPartialUpdateRequest)msg).getAttribute()) + )); + + + // These updates are made against existing flow path entities. + final List<HookNotification.HookNotificationMessage> partialNiFiFlowPathUpdates = safeGet(partialNiFiFlowPathUpdateAndOthers, true); + final List<HookNotification.HookNotificationMessage> otherMessages = safeGet(partialNiFiFlowPathUpdateAndOthers, false); + metrics.partialNiFiFlowPathUpdates = partialNiFiFlowPathUpdates.size(); + metrics.otherMessages = otherMessages.size(); + + + // 2. Notify de-duplicated 'nifi_flow_path' updates + final List<HookNotification.HookNotificationMessage> deduplicatedMessages = partialNiFiFlowPathUpdates.stream().map(msg -> (HookNotification.EntityPartialUpdateRequest) msg) + // Group by nifi_flow_path qualifiedName value. + .collect(groupingBy(HookNotification.EntityPartialUpdateRequest::getAttributeValue)).entrySet().stream() + .map(entry -> { + final String flowPathQualifiedName = entry.getKey(); + final Map<String, Referenceable> distinctInputs; + final Map<String, Referenceable> distinctOutputs; + final String flowPathGuid; + try { + // Fetch existing nifi_flow_path and its inputs/ouputs. + metrics.flowPathSearched++; + final AtlasEntity.AtlasEntityWithExtInfo flowPathExt + = atlasClient.searchEntityDef(new AtlasObjectId(TYPE_NIFI_FLOW_PATH, ATTR_QUALIFIED_NAME, flowPathQualifiedName)); + final AtlasEntity flowPathEntity = flowPathExt.getEntity(); + flowPathGuid = flowPathEntity.getGuid(); + distinctInputs = toReferenceables(flowPathEntity.getAttribute(ATTR_INPUTS), metrics); + distinctOutputs = toReferenceables(flowPathEntity.getAttribute(ATTR_OUTPUTS), metrics); + + } catch (AtlasServiceException e) { + if (ClientResponse.Status.NOT_FOUND.equals(e.getStatus())) { + logger.debug("nifi_flow_path was not found for qualifiedName {}", flowPathQualifiedName); + } else { + logger.warn("Failed to retrieve nifi_flow_path with qualifiedName {} due to {}", flowPathQualifiedName, e, e); + } + return null; + } + + // Merge all inputs and outputs for this nifi_flow_path. + for (HookNotification.EntityPartialUpdateRequest msg : entry.getValue()) { + fromReferenceable(msg.getEntity().get(ATTR_INPUTS), metrics) + .entrySet().stream().filter(ref -> !distinctInputs.containsKey(ref.getKey())) + .forEach(ref -> distinctInputs.put(ref.getKey(), ref.getValue())); + + fromReferenceable(msg.getEntity().get(ATTR_OUTPUTS), metrics) + .entrySet().stream().filter(ref -> !distinctOutputs.containsKey(ref.getKey())) + .forEach(ref -> distinctOutputs.put(ref.getKey(), ref.getValue())); + } + + // Consolidate messages into one. + final Referenceable flowPathRef = new Referenceable(flowPathGuid, TYPE_NIFI_FLOW_PATH, null); + // NOTE: distinctInputs.values() returns HashMap$Values, which causes following error. To avoid that, wrap with ArrayList: + // org.json4s.package$MappingException: Can't find ScalaSig for class org.apache.atlas.typesystem.Referenceable + flowPathRef.set(ATTR_INPUTS, new ArrayList<>(distinctInputs.values())); + flowPathRef.set(ATTR_OUTPUTS, new ArrayList<>(distinctOutputs.values())); + return new HookNotification.EntityPartialUpdateRequest(NIFI_USER, TYPE_NIFI_FLOW_PATH, + ATTR_QUALIFIED_NAME, flowPathQualifiedName, flowPathRef); + }) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + + metrics.uniquePartialNiFiFlowPathUpdates = deduplicatedMessages.size(); + notifier.accept(deduplicatedMessages); + + // 3. Notify other messages + notifier.accept(otherMessages); + + } finally { + logger.info(metrics.toLogString("Finished")); + } + + } + + /** + * <p>Convert nifi_flow_path inputs or outputs to a map of Referenceable keyed by qualifiedName.</p> + * <p>Atlas removes existing references those are not specified when a collection attribute is updated. + * In order to preserve existing DataSet references, existing elements should be passed within a partial update message.</p> + * <p>This method also populates entity cache for subsequent lookups.</p> + * @param _refs Contains references from an existin nifi_flow_path entity inputs or outputs attribute. + * @return A map of Referenceables keyed by qualifiedName. + */ + @SuppressWarnings("unchecked") + private Map<String, Referenceable> toReferenceables(Object _refs, Metrics metrics) { + if (_refs == null) { + // NOTE: This empty map may be used to add new Referenceables. Can not be Collection.emptyMap which does not support addition. + return new HashMap<>(); + } + + final Collection<Map<String, Object>> refs = (Collection<Map<String, Object>>) _refs; + return refs.stream().map(ref -> { + // Existing reference should has a GUID. + final String typeName = (String) ref.get(ATTR_TYPENAME); + final String guid = (String) ref.get(ATTR_GUID); + + if (guidToQualifiedName.containsKey(guid)) { + metrics.dataSetCacheHit++; + } + + final String refQualifiedName = guidToQualifiedName.computeIfAbsent(guid, k -> { + try { + metrics.dataSetSearched++; + final AtlasEntity.AtlasEntityWithExtInfo refExt = atlasClient.searchEntityDef(new AtlasObjectId(guid, typeName)); + final String qualifiedName = (String) refExt.getEntity().getAttribute(ATTR_QUALIFIED_NAME); + typedQualifiedNameToRef.put(toTypedQualifiedName(typeName, qualifiedName), new Referenceable(guid, typeName, Collections.EMPTY_MAP)); + return qualifiedName; + } catch (AtlasServiceException e) { + if (ClientResponse.Status.NOT_FOUND.equals(e.getStatus())) { + logger.warn("{} entity was not found for guid {}", typeName, guid); + } else { + logger.warn("Failed to retrieve {} with guid {} due to {}", typeName, guid, e); + } + return null; + } + }); + + if (refQualifiedName == null) { + return null; + } + return new Tuple<>(refQualifiedName, typedQualifiedNameToRef.get(toTypedQualifiedName(typeName, refQualifiedName))); + }).filter(Objects::nonNull).filter(tuple -> tuple.getValue() != null) + // If duplication happens, use new value. + .collect(toMap(Tuple::getKey, Tuple::getValue, (oldValue, newValue) -> { + logger.debug("Duplicated qualified name was found, use the new one. oldValue={}, newValue={}", new Object[]{oldValue, newValue}); + return newValue; + })); + } + + @SuppressWarnings("unchecked") + private Map<String, Referenceable> fromReferenceable(Object _refs, Metrics metrics) { + if (_refs == null) { + return Collections.emptyMap(); + } + + final Collection<Referenceable> refs = (Collection<Referenceable>) _refs; + return refs.stream().map(ref -> { + // This ref is created within this reporting cycle, and it may not have GUID assigned yet, if it is a brand new reference. + // If cache has the Reference, then use it because instances in the cache are guaranteed to have GUID assigned. + // Brand new Referenceables have to have all mandatory attributes. + final String typeName = ref.getTypeName(); + final Id id = ref.getId(); + final String refQualifiedName = (String) ref.get(ATTR_QUALIFIED_NAME); + final String typedRefQualifiedName = toTypedQualifiedName(typeName, refQualifiedName); + + final Referenceable refFromCacheIfAvailable = typedQualifiedNameToRef.computeIfAbsent(typedRefQualifiedName, k -> { + if (id.isAssigned()) { + // If this referenceable has Guid assigned, then add this one to cache. + guidToQualifiedName.put(id._getId(), refQualifiedName); + typedQualifiedNameToRef.put(typedRefQualifiedName, ref); + } + return ref; + }); + + return new Tuple<>(refQualifiedName, refFromCacheIfAvailable); + }).filter(tuple -> tuple.getValue() != null) + .collect(toMap(Tuple::getKey, Tuple::getValue)); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/79a7014a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/AbstractLineageStrategy.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/AbstractLineageStrategy.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/AbstractLineageStrategy.java index 11d6e8b..a253c7d 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/AbstractLineageStrategy.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/AbstractLineageStrategy.java @@ -40,7 +40,7 @@ import java.util.stream.Collectors; import static org.apache.nifi.atlas.AtlasUtils.toStr; import static org.apache.nifi.atlas.AtlasUtils.toTypedQualifiedName; -import static org.apache.nifi.atlas.NiFiAtlasHook.NIFI_USER; +import static org.apache.nifi.atlas.hook.NiFiAtlasHook.NIFI_USER; import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS; import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME; import static org.apache.nifi.atlas.NiFiTypes.ATTR_NIFI_FLOW; @@ -136,7 +136,7 @@ public abstract class AbstractLineageStrategy implements LineageStrategy { } @SuppressWarnings("unchecked") - private boolean addDataSetRefs(Set<Referenceable> refsToAdd, Referenceable nifiFlowPath, String targetAttribute) { + protected boolean addDataSetRefs(Set<Referenceable> refsToAdd, Referenceable nifiFlowPath, String targetAttribute) { if (refsToAdd != null && !refsToAdd.isEmpty()) { // If nifiFlowPath already has a given dataSetRef, then it needs not to be created. http://git-wip-us.apache.org/repos/asf/nifi/blob/79a7014a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/CompleteFlowPathLineage.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/CompleteFlowPathLineage.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/CompleteFlowPathLineage.java index 4437bfc..d3ac658 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/CompleteFlowPathLineage.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/CompleteFlowPathLineage.java @@ -42,7 +42,9 @@ import java.util.zip.CRC32; import static org.apache.nifi.atlas.AtlasUtils.toQualifiedName; import static org.apache.nifi.atlas.AtlasUtils.toStr; import static org.apache.nifi.atlas.AtlasUtils.toTypedQualifiedName; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS; import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS; import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE; import static org.apache.nifi.provenance.ProvenanceEventType.DROP; @@ -76,8 +78,36 @@ public class CompleteFlowPathLineage extends AbstractLineageStrategy { createCompleteFlowPath(nifiFlow, lineagePath, createdFlowPaths); for (Tuple<NiFiFlowPath, DataSetRefs> createdFlowPath : createdFlowPaths) { final NiFiFlowPath flowPath = createdFlowPath.getKey(); - createEntity(toReferenceable(flowPath, nifiFlow)); + // NOTE 1: FlowPath creation and DataSet references should be reported separately + // ------------------------------------------------------------------------------ + // For example, with following provenance event inputs: + // CREATE(F1), FORK (F1 -> F2, F3), DROP(F1), SEND (F2), SEND(F3), DROP(F2), DROP(F3), + // there is no guarantee that DROP(F2) and DROP(F3) are processed within the same cycle. + // If DROP(F3) is processed in different cycle, it needs to be added to the existing FlowPath + // that contains F1 -> F2, to be F1 -> F2, F3. + // Execution cycle 1: Path1 (source of F1 -> ForkA), ForkA_queue (F1 -> F2), Path2 (ForkA -> dest of F2) + // Execution cycle 2: Path1 (source of F1 -> ForkB), ForkB_queue (F1 -> F3), Path3 (ForkB -> dest of F3) + + // NOTE 2: Both FlowPath creation and FlowPath update messages are required + // ------------------------------------------------------------------------ + // For the 1st time when a lineage is found, nifi_flow_path and referred DataSets are created. + // If we notify these entities by a create 3 entities message (Path1, DataSet1, DataSet2) + // followed by 1 partial update message to add lineage (DataSet1 -> Path1 -> DataSet2), then + // the update message may arrive at Atlas earlier than the create message gets processed. + // If that happens, lineage among these entities will be missed. + // But as written in NOTE1, there is a case where existing nifi_flow_paths need to be updated. + // Also, we don't know if this is the 1st time or 2nd or later. + // So, we need to notify entity creation and also partial update messages. + + // Create flow path entity with DataSet refs. + final Referenceable flowPathRef = toReferenceable(flowPath, nifiFlow); + final DataSetRefs dataSetRefs = createdFlowPath.getValue(); + addDataSetRefs(dataSetRefs.getInputs(), flowPathRef, ATTR_INPUTS); + addDataSetRefs(dataSetRefs.getOutputs(), flowPathRef, ATTR_OUTPUTS); + createEntity(flowPathRef); + // Also, sending partial update message to update existing flow_path. addDataSetRefs(nifiFlow, Collections.singleton(flowPath), createdFlowPath.getValue()); + } createdFlowPaths.clear(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/79a7014a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java index 29ae013..29926ba 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java @@ -56,8 +56,8 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.atlas.hook.NiFiAtlasHook; import org.apache.nifi.atlas.NiFiAtlasClient; -import org.apache.nifi.atlas.NiFiAtlasHook; import org.apache.nifi.atlas.NiFiFlow; import org.apache.nifi.atlas.NiFiFlowAnalyzer; import org.apache.nifi.atlas.provenance.AnalysisContext; http://git-wip-us.apache.org/repos/asf/nifi/blob/79a7014a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/hook/TestNotificationSender.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/hook/TestNotificationSender.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/hook/TestNotificationSender.java new file mode 100644 index 0000000..8b356e1 --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/hook/TestNotificationSender.java @@ -0,0 +1,417 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.hook; + +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.notification.hook.HookNotification; +import org.apache.atlas.typesystem.Referenceable; +import org.apache.nifi.atlas.AtlasUtils; +import org.apache.nifi.atlas.NiFiAtlasClient; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import static java.util.Arrays.asList; +import static java.util.Collections.singleton; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_DATASET; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE; +import static org.apache.nifi.atlas.hook.NiFiAtlasHook.NIFI_USER; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestNotificationSender { + + private static final Logger logger = LoggerFactory.getLogger(TestNotificationSender.class); + + private static class Notifier implements Consumer<List<HookNotification.HookNotificationMessage>> { + private final List<List<HookNotification.HookNotificationMessage>> notifications = new ArrayList<>(); + @Override + public void accept(List<HookNotification.HookNotificationMessage> messages) { + logger.info("notified at {}, {}", notifications.size(), messages); + notifications.add(messages); + } + } + + @Test + public void testZeroMessage() { + final NotificationSender sender = new NotificationSender(); + final List<HookNotification.HookNotificationMessage> messages = Collections.emptyList(); + final Notifier notifier = new Notifier(); + sender.send(messages, notifier); + assertEquals(0, notifier.notifications.get(0).size()); + assertEquals(0, notifier.notifications.get(1).size()); + } + + private Referenceable createRef(String type, String qname) { + final Referenceable ref = new Referenceable(type); + ref.set(ATTR_QUALIFIED_NAME, qname); + return ref; + } + + @SuppressWarnings("unchecked") + private void assertCreateMessage(Notifier notifier, int notificationIndex, Referenceable ... expects) { + assertTrue(notifier.notifications.size() > notificationIndex); + final List<HookNotification.HookNotificationMessage> messages = notifier.notifications.get(notificationIndex); + assertEquals(1, messages.size()); + final HookNotification.EntityCreateRequest message = (HookNotification.EntityCreateRequest) messages.get(0); + assertEquals(expects.length, message.getEntities().size()); + + // The use of 'flatMap' at NotificationSender does not preserve actual entities order. + // Use typed qname map to assert regardless of ordering. + final Map<String, Referenceable> entities = message.getEntities().stream().collect(Collectors.toMap( + ref -> AtlasUtils.toTypedQualifiedName(ref.getTypeName(), (String) ref.get(ATTR_QUALIFIED_NAME)), ref -> ref)); + + boolean hasFlowPathSeen = false; + for (int i = 0; i < expects.length; i++) { + final Referenceable expect = expects[i]; + final String typeName = expect.getTypeName(); + final Referenceable actual = entities.get(AtlasUtils.toTypedQualifiedName(typeName, (String) expect.get(ATTR_QUALIFIED_NAME))); + assertNotNull(actual); + assertEquals(typeName, actual.getTypeName()); + assertEquals(expect.get(ATTR_QUALIFIED_NAME), actual.get(ATTR_QUALIFIED_NAME)); + + if (TYPE_NIFI_FLOW_PATH.equals(typeName)) { + assertIOReferences(expect, actual, ATTR_INPUTS); + assertIOReferences(expect, actual, ATTR_OUTPUTS); + hasFlowPathSeen = true; + } else { + assertFalse("Types other than nifi_flow_path should be created before any nifi_flow_path entity.", hasFlowPathSeen); + } + } + } + + @SuppressWarnings("unchecked") + private void assertIOReferences(Referenceable expect, Referenceable actual, String attrName) { + final Collection<Referenceable> expectedRefs = (Collection<Referenceable>) expect.get(attrName); + if (expectedRefs != null) { + final Collection<Referenceable> actualRefs = (Collection<Referenceable>) actual.get(attrName); + assertEquals(expectedRefs.size(), actualRefs.size()); + final Iterator<Referenceable> actualIterator = actualRefs.iterator(); + for (Referenceable expectedRef : expectedRefs) { + final Referenceable actualRef = actualIterator.next(); + assertEquals(expectedRef.getTypeName(), actualRef.getTypeName()); + assertEquals(expectedRef.get(ATTR_QUALIFIED_NAME), actualRef.get(ATTR_QUALIFIED_NAME)); + } + } + } + + @SuppressWarnings("unchecked") + private void assertUpdateFlowPathMessage(Notifier notifier, int notificationIndex, Referenceable ... expects) { + assertTrue(notifier.notifications.size() > notificationIndex); + final List<HookNotification.HookNotificationMessage> messages = notifier.notifications.get(notificationIndex); + assertEquals(expects.length, messages.size()); + for (int i = 0; i < expects.length; i++) { + final Referenceable expect = expects[i]; + final HookNotification.EntityPartialUpdateRequest actual = (HookNotification.EntityPartialUpdateRequest) messages.get(i); + assertEquals(expect.getTypeName(), actual.getTypeName()); + assertEquals(ATTR_QUALIFIED_NAME, actual.getAttribute()); + assertEquals(expect.get(ATTR_QUALIFIED_NAME), actual.getAttributeValue()); + + final Collection expIn = (Collection) expect.get(ATTR_INPUTS); + final Collection expOut = (Collection) expect.get(ATTR_OUTPUTS); + assertTrue(expIn.containsAll((Collection) actual.getEntity().get(ATTR_INPUTS))); + assertTrue(expOut.containsAll((Collection) actual.getEntity().get(ATTR_OUTPUTS))); + } + } + + @Test + public void testOneCreateDataSetMessage() { + final NotificationSender sender = new NotificationSender(); + final Referenceable queue1 = createRef(TYPE_NIFI_QUEUE, "queue1@test"); + final List<HookNotification.HookNotificationMessage> messages = Collections.singletonList( + new HookNotification.EntityCreateRequest(NIFI_USER, queue1)); + final Notifier notifier = new Notifier(); + sender.send(messages, notifier); + + // EntityCreateRequest containing nifi_queue. + assertCreateMessage(notifier, 0, queue1); + } + + @Test + public void testCreateDataSetMessageDeduplication() { + // Simulating complete path, that there were 4 FlowFiles went through the same partial flow. + // FF1 was ingested from data1, and so was FF2 from data2. + // Then FF1 was FORKed to FF11 and FF12, then sent to data11 and data12. + // Similarly FF2 was FORKed to FF21 and FF22, then sent to data21 and data22. + // FF3 went through the same as FF1, so did FF4 as FF2. + // All of those provenance events were processed within a single ReportLineageToAtlas cycle. + + // FF1: data1 -> pathA1 -> FORK11 (FF11) -> pathB11 -> data11 + // -> FORK12 (FF12) -> pathB12 -> data12 + // FF2: data2 -> pathA2 -> FORK21 (FF21) -> pathB21 -> data21 + // -> FORK22 (FF22) -> pathB22 -> data22 + // FF3: data1 -> pathA1 -> FORK11 (FF31) -> pathB11 -> data11 + // -> FORK12 (FF32) -> pathB12 -> data12 + // FF4: data2 -> pathA2 -> FORK21 (FF41) -> pathB21 -> data21 + // -> FORK22 (FF42) -> pathB22 -> data22 + + // As a result, following lineages are reported to Atlas: + // data1 -> pathA1 -> FORK11 -> pathB11 -> data11 + // -> FORK12 -> pathB12 -> data12 + // data2 -> pathA2 -> FORK21 -> pathB21 -> data21 + // -> FORK22 -> pathB22 -> data22 + + final NotificationSender sender = new NotificationSender(); + + // From FF1 + final Referenceable ff1_data1 = createRef(TYPE_DATASET, "data1@test"); + final Referenceable ff1_data11 = createRef(TYPE_DATASET, "data11@test"); + final Referenceable ff1_data12 = createRef(TYPE_DATASET, "data12@test"); + final Referenceable ff1_pathA11 = createRef(TYPE_NIFI_FLOW_PATH, "A::0001@test"); + final Referenceable ff1_pathA12 = createRef(TYPE_NIFI_FLOW_PATH, "A::0001@test"); + final Referenceable ff1_fork11 = createRef(TYPE_NIFI_QUEUE, "B::0011@test"); + final Referenceable ff1_fork12 = createRef(TYPE_NIFI_QUEUE, "B::0012@test"); + final Referenceable ff1_pathB11 = createRef(TYPE_NIFI_FLOW_PATH, "B::0011@test"); + final Referenceable ff1_pathB12 = createRef(TYPE_NIFI_FLOW_PATH, "B::0012@test"); + // From FF11 + ff1_pathA11.set(ATTR_INPUTS, singleton(ff1_data1)); + ff1_pathA11.set(ATTR_OUTPUTS, singleton(ff1_fork11)); + ff1_pathB11.set(ATTR_INPUTS, singleton(ff1_fork11)); + ff1_pathB11.set(ATTR_OUTPUTS, singleton(ff1_data11)); + // From FF12 + ff1_pathA12.set(ATTR_INPUTS, singleton(ff1_data1)); + ff1_pathA12.set(ATTR_OUTPUTS, singleton(ff1_fork12)); + ff1_pathB12.set(ATTR_INPUTS, singleton(ff1_fork12)); + ff1_pathB12.set(ATTR_OUTPUTS, singleton(ff1_data12)); + + // From FF2 + final Referenceable ff2_data2 = createRef(TYPE_DATASET, "data2@test"); + final Referenceable ff2_data21 = createRef(TYPE_DATASET, "data21@test"); + final Referenceable ff2_data22 = createRef(TYPE_DATASET, "data22@test"); + final Referenceable ff2_pathA21 = createRef(TYPE_NIFI_FLOW_PATH, "A::0002@test"); + final Referenceable ff2_pathA22 = createRef(TYPE_NIFI_FLOW_PATH, "A::0002@test"); + final Referenceable ff2_fork21 = createRef(TYPE_NIFI_QUEUE, "B::0021@test"); + final Referenceable ff2_fork22 = createRef(TYPE_NIFI_QUEUE, "B::0022@test"); + final Referenceable ff2_pathB21 = createRef(TYPE_NIFI_FLOW_PATH, "B::0021@test"); + final Referenceable ff2_pathB22 = createRef(TYPE_NIFI_FLOW_PATH, "B::0022@test"); + // From FF21 + ff2_pathA21.set(ATTR_INPUTS, singleton(ff2_data2)); + ff2_pathA21.set(ATTR_OUTPUTS, singleton(ff2_fork21)); + ff2_pathB21.set(ATTR_INPUTS, singleton(ff2_fork21)); + ff2_pathB21.set(ATTR_OUTPUTS, singleton(ff2_data21)); + // From FF22 + ff2_pathA22.set(ATTR_INPUTS, singleton(ff2_data2)); + ff2_pathA22.set(ATTR_OUTPUTS, singleton(ff2_fork22)); + ff2_pathB22.set(ATTR_INPUTS, singleton(ff2_fork22)); + ff2_pathB22.set(ATTR_OUTPUTS, singleton(ff2_data22)); + + // From FF3 + final Referenceable ff3_data1 = createRef(TYPE_DATASET, "data1@test"); + final Referenceable ff3_data11 = createRef(TYPE_DATASET, "data11@test"); + final Referenceable ff3_data12 = createRef(TYPE_DATASET, "data12@test"); + final Referenceable ff3_pathA11 = createRef(TYPE_NIFI_FLOW_PATH, "A::0001@test"); + final Referenceable ff3_pathA12 = createRef(TYPE_NIFI_FLOW_PATH, "A::0001@test"); + final Referenceable ff3_fork11 = createRef(TYPE_NIFI_QUEUE, "B::0011@test"); + final Referenceable ff3_fork12 = createRef(TYPE_NIFI_QUEUE, "B::0012@test"); + final Referenceable ff3_pathB11 = createRef(TYPE_NIFI_FLOW_PATH, "B::0011@test"); + final Referenceable ff3_pathB12 = createRef(TYPE_NIFI_FLOW_PATH, "B::0012@test"); + // From FF31 + ff3_pathA11.set(ATTR_INPUTS, singleton(ff3_data1)); + ff3_pathA11.set(ATTR_OUTPUTS, singleton(ff3_fork11)); + ff3_pathB11.set(ATTR_INPUTS, singleton(ff3_fork11)); + ff3_pathB11.set(ATTR_OUTPUTS, singleton(ff3_data11)); + // From FF32 + ff3_pathA12.set(ATTR_INPUTS, singleton(ff3_data1)); + ff3_pathA12.set(ATTR_OUTPUTS, singleton(ff3_fork12)); + ff3_pathB12.set(ATTR_INPUTS, singleton(ff3_fork12)); + ff3_pathB12.set(ATTR_OUTPUTS, singleton(ff3_data12)); + + // From FF4 + final Referenceable ff4_data2 = createRef(TYPE_DATASET, "data2@test"); + final Referenceable ff4_data21 = createRef(TYPE_DATASET, "data21@test"); + final Referenceable ff4_data22 = createRef(TYPE_DATASET, "data22@test"); + final Referenceable ff4_pathA21 = createRef(TYPE_NIFI_FLOW_PATH, "A::0002@test"); + final Referenceable ff4_pathA22 = createRef(TYPE_NIFI_FLOW_PATH, "A::0002@test"); + final Referenceable ff4_fork21 = createRef(TYPE_NIFI_QUEUE, "B::0021@test"); + final Referenceable ff4_fork22 = createRef(TYPE_NIFI_QUEUE, "B::0022@test"); + final Referenceable ff4_pathB21 = createRef(TYPE_NIFI_FLOW_PATH, "B::0021@test"); + final Referenceable ff4_pathB22 = createRef(TYPE_NIFI_FLOW_PATH, "B::0022@test"); + // From FF41 + ff4_pathA21.set(ATTR_INPUTS, singleton(ff4_data2)); + ff4_pathA21.set(ATTR_OUTPUTS, singleton(ff4_fork21)); + ff4_pathB21.set(ATTR_INPUTS, singleton(ff4_fork21)); + ff4_pathB21.set(ATTR_OUTPUTS, singleton(ff4_data21)); + // From FF42 + ff4_pathA22.set(ATTR_INPUTS, singleton(ff4_data2)); + ff4_pathA22.set(ATTR_OUTPUTS, singleton(ff4_fork22)); + ff4_pathB22.set(ATTR_INPUTS, singleton(ff4_fork22)); + ff4_pathB22.set(ATTR_OUTPUTS, singleton(ff4_data22)); + + + final List<HookNotification.HookNotificationMessage> messages = asList( + new HookNotification.EntityCreateRequest(NIFI_USER, ff1_data1), + new HookNotification.EntityCreateRequest(NIFI_USER, ff1_data11), + new HookNotification.EntityCreateRequest(NIFI_USER, ff1_data12), + new HookNotification.EntityCreateRequest(NIFI_USER, ff1_pathA11), + new HookNotification.EntityCreateRequest(NIFI_USER, ff1_pathA12), + new HookNotification.EntityCreateRequest(NIFI_USER, ff1_fork11), + new HookNotification.EntityCreateRequest(NIFI_USER, ff1_fork12), + new HookNotification.EntityCreateRequest(NIFI_USER, ff1_pathB11), + new HookNotification.EntityCreateRequest(NIFI_USER, ff1_pathB12), + + new HookNotification.EntityCreateRequest(NIFI_USER, ff2_data2), + new HookNotification.EntityCreateRequest(NIFI_USER, ff2_data21), + new HookNotification.EntityCreateRequest(NIFI_USER, ff2_data22), + new HookNotification.EntityCreateRequest(NIFI_USER, ff2_pathA21), + new HookNotification.EntityCreateRequest(NIFI_USER, ff2_pathA22), + new HookNotification.EntityCreateRequest(NIFI_USER, ff2_fork21), + new HookNotification.EntityCreateRequest(NIFI_USER, ff2_fork22), + new HookNotification.EntityCreateRequest(NIFI_USER, ff2_pathB21), + new HookNotification.EntityCreateRequest(NIFI_USER, ff2_pathB22), + + new HookNotification.EntityCreateRequest(NIFI_USER, ff3_data1), + new HookNotification.EntityCreateRequest(NIFI_USER, ff3_data11), + new HookNotification.EntityCreateRequest(NIFI_USER, ff3_data12), + new HookNotification.EntityCreateRequest(NIFI_USER, ff3_pathA11), + new HookNotification.EntityCreateRequest(NIFI_USER, ff3_pathA12), + new HookNotification.EntityCreateRequest(NIFI_USER, ff3_fork11), + new HookNotification.EntityCreateRequest(NIFI_USER, ff3_fork12), + new HookNotification.EntityCreateRequest(NIFI_USER, ff3_pathB11), + new HookNotification.EntityCreateRequest(NIFI_USER, ff3_pathB12), + + new HookNotification.EntityCreateRequest(NIFI_USER, ff4_data2), + new HookNotification.EntityCreateRequest(NIFI_USER, ff4_data21), + new HookNotification.EntityCreateRequest(NIFI_USER, ff4_data22), + new HookNotification.EntityCreateRequest(NIFI_USER, ff4_pathA21), + new HookNotification.EntityCreateRequest(NIFI_USER, ff4_pathA22), + new HookNotification.EntityCreateRequest(NIFI_USER, ff4_fork21), + new HookNotification.EntityCreateRequest(NIFI_USER, ff4_fork22), + new HookNotification.EntityCreateRequest(NIFI_USER, ff4_pathB21), + new HookNotification.EntityCreateRequest(NIFI_USER, ff4_pathB22) + ); + + final Notifier notifier = new Notifier(); + sender.send(messages, notifier); + + // EntityCreateRequest, same entities get de-duplicated. nifi_flow_path is created after other types. + final Referenceable r_data1 = createRef(TYPE_DATASET, "data1@test"); + final Referenceable r_data11 = createRef(TYPE_DATASET, "data11@test"); + final Referenceable r_data12 = createRef(TYPE_DATASET, "data12@test"); + final Referenceable r_pathA1 = createRef(TYPE_NIFI_FLOW_PATH, "A::0001@test"); + final Referenceable r_fork11 = createRef(TYPE_NIFI_QUEUE, "B::0011@test"); + final Referenceable r_fork12 = createRef(TYPE_NIFI_QUEUE, "B::0012@test"); + final Referenceable r_pathB11 = createRef(TYPE_NIFI_FLOW_PATH, "B::0011@test"); + final Referenceable r_pathB12 = createRef(TYPE_NIFI_FLOW_PATH, "B::0012@test"); + r_pathA1.set(ATTR_INPUTS, singleton(r_data1)); + r_pathA1.set(ATTR_OUTPUTS, asList(r_fork11, r_fork12)); + r_pathB11.set(ATTR_INPUTS, singleton(r_fork11)); + r_pathB11.set(ATTR_OUTPUTS, singleton(r_data11)); + r_pathB12.set(ATTR_INPUTS, singleton(r_fork12)); + r_pathB12.set(ATTR_OUTPUTS, singleton(r_data12)); + + final Referenceable r_data2 = createRef(TYPE_DATASET, "data2@test"); + final Referenceable r_data21 = createRef(TYPE_DATASET, "data21@test"); + final Referenceable r_data22 = createRef(TYPE_DATASET, "data22@test"); + final Referenceable r_pathA2 = createRef(TYPE_NIFI_FLOW_PATH, "A::0002@test"); + final Referenceable r_fork21 = createRef(TYPE_NIFI_QUEUE, "B::0021@test"); + final Referenceable r_fork22 = createRef(TYPE_NIFI_QUEUE, "B::0022@test"); + final Referenceable r_pathB21 = createRef(TYPE_NIFI_FLOW_PATH, "B::0021@test"); + final Referenceable r_pathB22 = createRef(TYPE_NIFI_FLOW_PATH, "B::0022@test"); + r_pathA2.set(ATTR_INPUTS, singleton(r_data2)); + r_pathA2.set(ATTR_OUTPUTS, asList(r_fork21, r_fork22)); + r_pathB21.set(ATTR_INPUTS, singleton(r_fork21)); + r_pathB21.set(ATTR_OUTPUTS, singleton(r_data21)); + r_pathB22.set(ATTR_INPUTS, singleton(r_fork22)); + r_pathB22.set(ATTR_OUTPUTS, singleton(r_data22)); + + assertCreateMessage(notifier, 0, + r_data1, r_data11, r_data12, r_fork11, r_fork12, + r_data2, r_data21, r_data22, r_fork21, r_fork22, + r_pathA1, r_pathB11, r_pathB12, + r_pathA2, r_pathB21, r_pathB22); + } + + private Map<String, String> createGuidReference(String type, String guid) { + Map<String, String> map = new HashMap<>(); + map.put(ATTR_TYPENAME, type); + map.put(ATTR_GUID, guid); + return map; + } + + @Test + public void testUpdateFlowPath() throws AtlasServiceException { + final NotificationSender sender = new NotificationSender(); + final Referenceable fileC = createRef("fs_path", "/tmp/file-c.txt@test"); + final Referenceable fileD = createRef("fs_path", "/tmp/file-d.txt@test"); + + // New in/out fileC and fileD are found for path1. + final Referenceable newPath1Lineage = createRef(TYPE_NIFI_FLOW_PATH, "path1@test"); + newPath1Lineage.set(ATTR_INPUTS, singleton(fileC)); + newPath1Lineage.set(ATTR_OUTPUTS, singleton(fileD)); + + final List<HookNotification.HookNotificationMessage> messages = asList( + new HookNotification.EntityCreateRequest(NIFI_USER, fileC), + new HookNotification.EntityCreateRequest(NIFI_USER, fileD), + new HookNotification.EntityPartialUpdateRequest(NIFI_USER, TYPE_NIFI_FLOW_PATH, ATTR_QUALIFIED_NAME, "path1@test", newPath1Lineage) + ); + + final NiFiAtlasClient atlasClient = mock(NiFiAtlasClient.class); + sender.setAtlasClient(atlasClient); + + // Existing nifi_flow_path + final AtlasEntity path1Entity = new AtlasEntity(TYPE_NIFI_FLOW_PATH, ATTR_QUALIFIED_NAME, "path1@test"); + path1Entity.setGuid("path1-guid"); + path1Entity.setAttribute(ATTR_INPUTS, singleton(createGuidReference("fs_path", "fileA-guid"))); + path1Entity.setAttribute(ATTR_OUTPUTS, singleton(createGuidReference("fs_path", "fileB-guid"))); + + final AtlasEntity fileAEntity = new AtlasEntity("fs_path", ATTR_QUALIFIED_NAME, "file-a.txt@test"); + fileAEntity.setGuid("fileA-guid"); + + final AtlasEntity fileBEntity = new AtlasEntity("fs_path", ATTR_QUALIFIED_NAME, "file-b.txt@test"); + fileBEntity.setGuid("fileA-guid"); + + final AtlasEntity.AtlasEntityWithExtInfo path1Ext = new AtlasEntity.AtlasEntityWithExtInfo(path1Entity); + final AtlasEntity.AtlasEntityWithExtInfo fileAExt = new AtlasEntity.AtlasEntityWithExtInfo(fileAEntity); + final AtlasEntity.AtlasEntityWithExtInfo fileBExt = new AtlasEntity.AtlasEntityWithExtInfo(fileBEntity); + when(atlasClient.searchEntityDef(eq(new AtlasObjectId(TYPE_NIFI_FLOW_PATH, ATTR_QUALIFIED_NAME,"path1@test")))).thenReturn(path1Ext); + when(atlasClient.searchEntityDef(eq(new AtlasObjectId("fileA-guid")))).thenReturn(fileAExt); + when(atlasClient.searchEntityDef(eq(new AtlasObjectId("fileB-guid")))).thenReturn(fileBExt); + + final Notifier notifier = new Notifier(); + sender.send(messages, notifier); + + assertCreateMessage(notifier, 0, fileC, fileD); + final Referenceable updatedPath1 = createRef(TYPE_NIFI_FLOW_PATH, "path1@test"); + updatedPath1.set(ATTR_INPUTS, asList(new Referenceable("fileA-guid", "fs_path", Collections.emptyMap()), fileC)); + updatedPath1.set(ATTR_OUTPUTS, asList(new Referenceable("fileB-guid", "fs_path", Collections.emptyMap()), fileD)); + assertUpdateFlowPathMessage(notifier, 1, updatedPath1); + } + +}
