Repository: nifi
Updated Branches:
  refs/heads/master ed9263811 -> 79a7014a9


NIFI-4971: ReportLineageToAtlas complete path can miss one-time lineages

- Separated Hook message de-duplication logic from NiFiAtlasHook to
NotificationSender
- NiFiAtlasHook used to send individual CREATE_ENTITY messages for each 
entities,
this commit changed it to bundle all new entities into a single
CREATE_ENTITY to preserve entity creation order, so that new DataSet
entities can be referred from new nifi_flow_path entities
- Added more unit tests

This closes #2542

Signed-off-by: Mike Thomsen <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/79a7014a
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/79a7014a
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/79a7014a

Branch: refs/heads/master
Commit: 79a7014a95dc3087f88248c732fb1e4ad8e6e128
Parents: ed92638
Author: Koji Kawamura <[email protected]>
Authored: Fri Mar 9 09:27:20 2018 +0900
Committer: Mike Thomsen <[email protected]>
Committed: Tue May 15 05:50:26 2018 -0400

----------------------------------------------------------------------
 .../org/apache/nifi/atlas/NiFiAtlasHook.java    | 300 -------------
 .../apache/nifi/atlas/hook/NiFiAtlasHook.java   |  68 +++
 .../nifi/atlas/hook/NotificationSender.java     | 392 +++++++++++++++++
 .../lineage/AbstractLineageStrategy.java        |   4 +-
 .../lineage/CompleteFlowPathLineage.java        |  32 +-
 .../atlas/reporting/ReportLineageToAtlas.java   |   2 +-
 .../nifi/atlas/hook/TestNotificationSender.java | 417 +++++++++++++++++++
 7 files changed, 911 insertions(+), 304 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/79a7014a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java
deleted file mode 100644
index 58945d5..0000000
--- 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java
+++ /dev/null
@@ -1,300 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.atlas;
-
-import com.sun.jersey.api.client.ClientResponse;
-import org.apache.atlas.AtlasServiceException;
-import org.apache.atlas.hook.AtlasHook;
-import org.apache.atlas.model.instance.AtlasEntity;
-import org.apache.atlas.model.instance.AtlasObjectId;
-import 
org.apache.atlas.notification.hook.HookNotification.EntityPartialUpdateRequest;
-import 
org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
-import org.apache.atlas.typesystem.Referenceable;
-import org.apache.atlas.typesystem.persistence.Id;
-import org.apache.nifi.atlas.provenance.lineage.LineageContext;
-import org.apache.nifi.util.Tuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.stream.Collectors;
-
-import static 
org.apache.atlas.notification.hook.HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE;
-import static org.apache.nifi.atlas.AtlasUtils.toTypedQualifiedName;
-import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID;
-import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
-import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS;
-import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
-import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME;
-import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH;
-
-/**
- * This class is not thread-safe as it holds uncommitted notification messages 
within instance.
- * {@link #addMessage(HookNotificationMessage)} and {@link #commitMessages()} 
should be used serially from a single thread.
- */
-public class NiFiAtlasHook extends AtlasHook implements LineageContext {
-
-    public static final String NIFI_USER = "nifi";
-
-    private static final Logger logger = 
LoggerFactory.getLogger(NiFiAtlasHook.class);
-    private static final String CONF_PREFIX = "atlas.hook.nifi.";
-    private static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries";
-
-    private NiFiAtlasClient atlasClient;
-
-    /**
-     * An index to resolve a qualifiedName from a GUID.
-     */
-    private final Map<String, String> guidToQualifiedName;
-    /**
-     * An index to resolve a Referenceable from a typeName::qualifiedName.
-     */
-    private final Map<String, Referenceable> typedQualifiedNameToRef;
-
-
-    private static <K, V> Map<K, V> createCache(final int maxSize) {
-        return new LinkedHashMap<K, V>(maxSize, 0.75f, true) {
-            @Override
-            protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
-                return size() > maxSize;
-            }
-        };
-    }
-
-    public NiFiAtlasHook() {
-        final int qualifiedNameCacheSize = 10_000;
-        this.guidToQualifiedName = createCache(qualifiedNameCacheSize);
-
-        final int dataSetRefCacheSize = 1_000;
-        this.typedQualifiedNameToRef = createCache(dataSetRefCacheSize);
-    }
-
-    public void setAtlasClient(NiFiAtlasClient atlasClient) {
-        this.atlasClient = atlasClient;
-    }
-
-    @Override
-    protected String getNumberOfRetriesPropertyKey() {
-        return HOOK_NUM_RETRIES;
-    }
-
-
-    private final List<HookNotificationMessage> messages = new ArrayList<>();
-
-    @Override
-    public void addMessage(HookNotificationMessage message) {
-        messages.add(message);
-    }
-
-    private class Metrics {
-        final long startedAt = System.currentTimeMillis();
-        int totalMessages;
-        int partialNiFiFlowPathUpdates;
-        int dedupedPartialNiFiFlowPathUpdates;
-        int otherMessages;
-        int flowPathSearched;
-        int dataSetSearched;
-        int dataSetCacheHit;
-        private void log(String message) {
-            logger.debug(String.format("%s, %d ms passed, totalMessages=%d," +
-                    " partialNiFiFlowPathUpdates=%d, 
dedupedPartialNiFiFlowPathUpdates=%d, otherMessage=%d," +
-                    " flowPathSearched=%d, dataSetSearched=%d, 
dataSetCacheHit=%s," +
-                    " guidToQualifiedName.size=%d, 
typedQualifiedNameToRef.size=%d",
-                    message, System.currentTimeMillis() - startedAt, 
totalMessages,
-                    partialNiFiFlowPathUpdates, 
dedupedPartialNiFiFlowPathUpdates, otherMessages,
-                    flowPathSearched, dataSetSearched, dataSetCacheHit,
-                    guidToQualifiedName.size(), 
typedQualifiedNameToRef.size()));
-        }
-    }
-
-    public void commitMessages() {
-        final Map<Boolean, List<HookNotificationMessage>> 
partialNiFiFlowPathUpdateAndOthers
-                = messages.stream().collect(Collectors.groupingBy(msg
-                    -> ENTITY_PARTIAL_UPDATE.equals(msg.getType())
-                        && 
TYPE_NIFI_FLOW_PATH.equals(((EntityPartialUpdateRequest)msg).getTypeName())
-                        && 
ATTR_QUALIFIED_NAME.equals(((EntityPartialUpdateRequest)msg).getAttribute())
-        ));
-
-
-        final List<HookNotificationMessage> otherMessages = 
partialNiFiFlowPathUpdateAndOthers.computeIfAbsent(false, k -> 
Collections.emptyList());
-        final List<HookNotificationMessage> partialNiFiFlowPathUpdates = 
partialNiFiFlowPathUpdateAndOthers.computeIfAbsent(true, k -> 
Collections.emptyList());
-        logger.info("Commit messages: {} partialNiFiFlowPathUpdate and {} 
other messages.", partialNiFiFlowPathUpdates.size(), otherMessages.size());
-
-        final Metrics metrics = new Metrics();
-        metrics.totalMessages = messages.size();
-        metrics.partialNiFiFlowPathUpdates = partialNiFiFlowPathUpdates.size();
-        metrics.otherMessages = otherMessages.size();
-
-        try {
-            // Notify other messages first.
-            notifyEntities(otherMessages);
-
-            // De-duplicate messages.
-            final List<HookNotificationMessage> deduplicatedMessages = 
partialNiFiFlowPathUpdates.stream().map(msg -> (EntityPartialUpdateRequest) msg)
-                    // Group by nifi_flow_path qualifiedName value.
-                    
.collect(Collectors.groupingBy(EntityPartialUpdateRequest::getAttributeValue)).entrySet().stream()
-                    .map(entry -> {
-                        final String flowPathQualifiedName = entry.getKey();
-                        final Map<String, Referenceable> distinctInputs;
-                        final Map<String, Referenceable> distinctOutputs;
-                        final String flowPathGuid;
-                        try {
-                            // Fetch existing nifi_flow_path and its 
inputs/ouputs.
-                            metrics.flowPathSearched++;
-                            final AtlasEntity.AtlasEntityWithExtInfo 
flowPathExt
-                                    = atlasClient.searchEntityDef(new 
AtlasObjectId(TYPE_NIFI_FLOW_PATH, ATTR_QUALIFIED_NAME, flowPathQualifiedName));
-                            final AtlasEntity flowPathEntity = 
flowPathExt.getEntity();
-                            flowPathGuid = flowPathEntity.getGuid();
-                            distinctInputs = 
toReferenceables(flowPathEntity.getAttribute(ATTR_INPUTS), metrics);
-                            distinctOutputs = 
toReferenceables(flowPathEntity.getAttribute(ATTR_OUTPUTS), metrics);
-
-                        } catch (AtlasServiceException e) {
-                            if 
(ClientResponse.Status.NOT_FOUND.equals(e.getStatus())) {
-                                logger.debug("nifi_flow_path was not found for 
qualifiedName {}", flowPathQualifiedName);
-                            } else {
-                                logger.warn("Failed to retrieve nifi_flow_path 
with qualifiedName {} due to {}", flowPathQualifiedName, e, e);
-                            }
-                            return null;
-                        }
-
-                        // Merge all inputs and outputs for this 
nifi_flow_path.
-                        for (EntityPartialUpdateRequest msg : 
entry.getValue()) {
-                            
fromReferenceable(msg.getEntity().get(ATTR_INPUTS), metrics)
-                                    .entrySet().stream().filter(ref -> 
!distinctInputs.containsKey(ref.getKey()))
-                                    .forEach(ref -> 
distinctInputs.put(ref.getKey(), ref.getValue()));
-
-                            
fromReferenceable(msg.getEntity().get(ATTR_OUTPUTS), metrics)
-                                    .entrySet().stream().filter(ref -> 
!distinctOutputs.containsKey(ref.getKey()))
-                                    .forEach(ref -> 
distinctOutputs.put(ref.getKey(), ref.getValue()));
-                        }
-
-                        // Consolidate messages into one.
-                        final Referenceable flowPathRef = new 
Referenceable(flowPathGuid, TYPE_NIFI_FLOW_PATH, null);
-                        // NOTE: distinctInputs.values() returns 
HashMap$Values, which causes following error. To avoid that, wrap with 
ArrayList:
-                        // org.json4s.package$MappingException: Can't find 
ScalaSig for class org.apache.atlas.typesystem.Referenceable
-                        flowPathRef.set(ATTR_INPUTS, new 
ArrayList<>(distinctInputs.values()));
-                        flowPathRef.set(ATTR_OUTPUTS, new 
ArrayList<>(distinctOutputs.values()));
-                        return new EntityPartialUpdateRequest(NIFI_USER, 
TYPE_NIFI_FLOW_PATH,
-                                ATTR_QUALIFIED_NAME, flowPathQualifiedName, 
flowPathRef);
-                    })
-                    .filter(Objects::nonNull)
-                    .collect(Collectors.toList());
-
-            metrics.dedupedPartialNiFiFlowPathUpdates = 
deduplicatedMessages.size();
-            notifyEntities(deduplicatedMessages);
-        } finally {
-            metrics.log("Committed");
-            messages.clear();
-        }
-    }
-
-    /**
-     * <p>Convert nifi_flow_path inputs or outputs to a map of Referenceable 
keyed by qualifiedName.</p>
-     * <p>Atlas removes existing references those are not specified when a 
collection attribute is updated.
-     * In order to preserve existing DataSet references, existing elements 
should be passed within a partial update message.</p>
-     * <p>This method also populates entity cache for subsequent lookups.</p>
-     * @param _refs Contains references from an existin nifi_flow_path entity 
inputs or outputs attribute.
-     * @return A map of Referenceables keyed by qualifiedName.
-     */
-    @SuppressWarnings("unchecked")
-    private Map<String, Referenceable> toReferenceables(Object _refs, Metrics 
metrics) {
-        if (_refs == null) {
-            // NOTE: This empty map may be used to add new Referenceables. Can 
not be Collection.emptyMap which does not support addition.
-            return new HashMap<>();
-        }
-
-        final List<Map<String, Object>> refs = (List<Map<String, Object>>) 
_refs;
-        return refs.stream().map(ref -> {
-            // Existing reference should has a GUID.
-            final String typeName = (String) ref.get(ATTR_TYPENAME);
-            final String guid = (String) ref.get(ATTR_GUID);
-
-            if (guidToQualifiedName.containsKey(guid)) {
-                metrics.dataSetCacheHit++;
-            }
-
-            final String refQualifiedName = 
guidToQualifiedName.computeIfAbsent(guid, k -> {
-                try {
-                    metrics.dataSetSearched++;
-                    final AtlasEntity.AtlasEntityWithExtInfo refExt = 
atlasClient.searchEntityDef(new AtlasObjectId(guid, typeName));
-                    final String qualifiedName = (String) 
refExt.getEntity().getAttribute(ATTR_QUALIFIED_NAME);
-                    typedQualifiedNameToRef.put(toTypedQualifiedName(typeName, 
qualifiedName), new Referenceable(guid, typeName, Collections.EMPTY_MAP));
-                    return qualifiedName;
-                } catch (AtlasServiceException e) {
-                    if (ClientResponse.Status.NOT_FOUND.equals(e.getStatus())) 
{
-                        logger.warn("{} entity was not found for guid {}", 
typeName, guid);
-                    } else {
-                        logger.warn("Failed to retrieve {} with guid {} due to 
{}", typeName, guid, e);
-                    }
-                    return null;
-                }
-            });
-
-            if (refQualifiedName == null) {
-                return null;
-            }
-            return new Tuple<>(refQualifiedName, 
typedQualifiedNameToRef.get(toTypedQualifiedName(typeName, refQualifiedName)));
-        }).filter(Objects::nonNull).filter(tuple -> tuple.getValue() != null)
-                // If duplication happens, use new value.
-                .collect(Collectors.toMap(Tuple::getKey, Tuple::getValue, 
(oldValue, newValue) -> {
-                    logger.warn("Duplicated qualified name was found, use the 
new one. oldValue={}, newValue={}", new Object[]{oldValue, newValue});
-                    return newValue;
-                }));
-    }
-
-    @SuppressWarnings("unchecked")
-    private Map<String, Referenceable> fromReferenceable(Object _refs, Metrics 
metrics) {
-        if (_refs == null) {
-            return Collections.emptyMap();
-        }
-
-        final List<Referenceable> refs = (List<Referenceable>) _refs;
-        return refs.stream().map(ref -> {
-            // This ref is created within this reporting cycle, and it may not 
have GUID assigned yet, if it is a brand new reference.
-            // If cache has the Reference, then use it because instances in 
the cache are guaranteed to have GUID assigned.
-            // Brand new Referenceables have to have all mandatory attributes.
-            final String typeName = ref.getTypeName();
-            final Id id = ref.getId();
-            final String refQualifiedName = (String) 
ref.get(ATTR_QUALIFIED_NAME);
-            final String typedRefQualifiedName = 
toTypedQualifiedName(typeName, refQualifiedName);
-
-            final Referenceable refFromCacheIfAvailable = 
typedQualifiedNameToRef.computeIfAbsent(typedRefQualifiedName, k -> {
-                if (id.isAssigned()) {
-                    // If this referenceable has Guid assigned, then add this 
one to cache.
-                    guidToQualifiedName.put(id._getId(), refQualifiedName);
-                    typedQualifiedNameToRef.put(typedRefQualifiedName, ref);
-                }
-                return ref;
-            });
-
-            return new Tuple<>(refQualifiedName, refFromCacheIfAvailable);
-        }).filter(Objects::nonNull).filter(tuple -> tuple.getValue() != null)
-                .collect(Collectors.toMap(Tuple::getKey, Tuple::getValue));
-    }
-
-    public void close() {
-        if (notificationInterface != null) {
-            notificationInterface.close();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/79a7014a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NiFiAtlasHook.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NiFiAtlasHook.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NiFiAtlasHook.java
new file mode 100644
index 0000000..4916337
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NiFiAtlasHook.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.hook;
+
+import org.apache.atlas.hook.AtlasHook;
+import 
org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
+import org.apache.nifi.atlas.NiFiAtlasClient;
+import org.apache.nifi.atlas.provenance.lineage.LineageContext;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This class is not thread-safe as it holds uncommitted notification messages 
within instance.
+ * {@link #addMessage(HookNotificationMessage)} and {@link #commitMessages()} 
should be used serially from a single thread.
+ */
+public class NiFiAtlasHook extends AtlasHook implements LineageContext {
+
+    public static final String NIFI_USER = "nifi";
+
+    private static final String CONF_PREFIX = "atlas.hook.nifi.";
+    private static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries";
+
+    private NiFiAtlasClient atlasClient;
+
+    public void setAtlasClient(NiFiAtlasClient atlasClient) {
+        this.atlasClient = atlasClient;
+    }
+
+    @Override
+    protected String getNumberOfRetriesPropertyKey() {
+        return HOOK_NUM_RETRIES;
+    }
+
+
+    private final List<HookNotificationMessage> messages = new ArrayList<>();
+
+    @Override
+    public void addMessage(HookNotificationMessage message) {
+        messages.add(message);
+    }
+
+    public void commitMessages() {
+        final NotificationSender notificationSender = new NotificationSender();
+        notificationSender.setAtlasClient(atlasClient);
+        notificationSender.send(messages, this::notifyEntities);
+    }
+
+    public void close() {
+        if (notificationInterface != null) {
+            notificationInterface.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/79a7014a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NotificationSender.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NotificationSender.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NotificationSender.java
new file mode 100644
index 0000000..4d599f1
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NotificationSender.java
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.hook;
+
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.notification.hook.HookNotification;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.typesystem.persistence.Id;
+import org.apache.nifi.atlas.AtlasUtils;
+import org.apache.nifi.atlas.NiFiAtlasClient;
+import org.apache.nifi.util.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.toMap;
+import static 
org.apache.atlas.notification.hook.HookNotification.HookNotificationType.ENTITY_CREATE;
+import static 
org.apache.atlas.notification.hook.HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE;
+import static org.apache.nifi.atlas.AtlasUtils.toTypedQualifiedName;
+import static org.apache.nifi.atlas.hook.NiFiAtlasHook.NIFI_USER;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH;
+
+/**
+ * This class implements Atlas hook notification message deduplication 
mechanism.
+ * Separated from {@link NiFiAtlasHook} for better testability.
+ */
+class NotificationSender {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(NotificationSender.class);
+
+    private NiFiAtlasClient atlasClient;
+
+    /**
+     * An index to resolve a qualifiedName from a GUID.
+     */
+    private final Map<String, String> guidToQualifiedName;
+    /**
+     * An index to resolve a Referenceable from a typeName::qualifiedName.
+     */
+    private final Map<String, Referenceable> typedQualifiedNameToRef;
+
+    private static <K, V> Map<K, V> createCache(final int maxSize) {
+        return new LinkedHashMap<K, V>(maxSize, 0.75f, true) {
+            @Override
+            protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
+                return size() > maxSize;
+            }
+        };
+    }
+
+    NotificationSender() {
+        final int qualifiedNameCacheSize = 10_000;
+        this.guidToQualifiedName = createCache(qualifiedNameCacheSize);
+
+        final int dataSetRefCacheSize = 1_000;
+        this.typedQualifiedNameToRef = createCache(dataSetRefCacheSize);
+    }
+
+    private class Metrics {
+        final long startedAt = System.currentTimeMillis();
+
+        /**
+         * The total number of messages passed to commitMessages.
+         */
+        int totalMessages;
+        /**
+         * The number of CreateEntityRequest messages before de-duplication.
+         */
+        int createMessages;
+        /**
+         * The number of unique CreateEntityRequest messages for 
'nifi_flow_path'.
+         */
+        int uniqueFlowPathCreates;
+        /**
+         * The number of unique CreateEntityRequest messages except 
'nifi_flow_path'.
+         */
+        int uniqueOtherCreates;
+        int partialNiFiFlowPathUpdates;
+        int uniquePartialNiFiFlowPathUpdates;
+        int otherMessages;
+        int flowPathSearched;
+        int dataSetSearched;
+        int dataSetCacheHit;
+
+        private String toLogString(String message) {
+            return String.format("%s, %d ms passed, totalMessages=%d," +
+                            " createMessages=%d, uniqueFlowPathCreates=%d, 
uniqueOtherCreates=%d," +
+                            " partialNiFiFlowPathUpdates=%d, 
uniquePartialNiFiFlowPathUpdates=%d, otherMessage=%d," +
+                            " flowPathSearched=%d, dataSetSearched=%d, 
dataSetCacheHit=%s," +
+                            " guidToQualifiedName.size=%d, 
typedQualifiedNameToRef.size=%d",
+                    message, System.currentTimeMillis() - startedAt, 
totalMessages,
+                    createMessages, uniqueFlowPathCreates, uniqueOtherCreates,
+                    partialNiFiFlowPathUpdates, 
uniquePartialNiFiFlowPathUpdates, otherMessages,
+                    flowPathSearched, dataSetSearched, dataSetCacheHit,
+                    guidToQualifiedName.size(), 
typedQualifiedNameToRef.size());
+        }
+    }
+
+    void setAtlasClient(NiFiAtlasClient atlasClient) {
+        this.atlasClient = atlasClient;
+    }
+
+    private Predicate<Referenceable> distinctReferenceable() {
+        final Set<String> keys = new HashSet<>();
+        return r -> {
+            final String key = 
AtlasUtils.toTypedQualifiedName(r.getTypeName(), (String) 
r.get(ATTR_QUALIFIED_NAME));
+            return keys.add(key);
+        };
+    }
+
+    private <K, V> List<V> safeGet(Map<K, List<V>> map, K key) {
+        return map.computeIfAbsent(key, k -> Collections.emptyList());
+    }
+
+    @SuppressWarnings("unchecked")
+    private void mergeRefs(Referenceable r1, Referenceable r2) {
+        r1.set(ATTR_INPUTS, mergeRefs((Collection<Referenceable>) 
r1.get(ATTR_INPUTS), (Collection<Referenceable>) r2.get(ATTR_INPUTS)));
+        r1.set(ATTR_OUTPUTS, mergeRefs((Collection<Referenceable>) 
r1.get(ATTR_OUTPUTS), (Collection<Referenceable>) r2.get(ATTR_OUTPUTS)));
+    }
+
+    private Collection<Referenceable> mergeRefs(Collection<Referenceable> r1, 
Collection<Referenceable> r2) {
+        final boolean isR1Empty = r1 == null || r1.isEmpty();
+        final boolean isR2Empty = r2 == null || r2.isEmpty();
+
+        if (isR1Empty) {
+            // r2 may or may not have entities, don't have to merge r1.
+            return r2;
+        } else if (isR2Empty) {
+            // r1 has some entities, don't have to merge r2.
+            return r1;
+        }
+
+        // If both have entities, then need to be merged.
+        return Stream.concat(r1.stream(), 
r2.stream()).filter(distinctReferenceable()).collect(Collectors.toList());
+    }
+
+    /**
+     * <p>Send hook notification messages.
+     * In order to notify relationships between 'nifi_flow_path' and its 
inputs/outputs, this method sends messages in following order:</p>
+     * <ol>
+     *     <li>As a a single {@link 
org.apache.atlas.notification.hook.HookNotification.EntityCreateRequest} 
message:
+     *         <ul>
+     *             <li>New entities except 'nifi_flow_path', including 
DataSets such as 'nifi_queue', 'kafka_topic' or 'hive_table' ... etc,
+     *             so that 'nifi_flow_path' can refer</li>
+     *             <li>New 'nifi_flow_path' entities, entities order is 
guaranteed in a single message</li>
+     *         </ul>
+     *     </li>
+     *     <li>Update 'nifi_flow_path' messages, before notifying update 
messages, this method fetches existing 'nifi_flow_path' entity
+     *     to merge new inputs/outputs element with existing ones, so that 
existing ones will not be removed.</li>
+     *     <li>Other messages except</li>
+     * </ol>
+     * <p>Messages having the same type and qualified name will be 
de-duplicated before being sent.</p>
+     * @param messages list of messages to be sent
+     * @param notifier responsible for sending notification messages, its 
accept method can be called multiple times
+     */
+    void send(final List<HookNotification.HookNotificationMessage> messages, 
final Consumer<List<HookNotification.HookNotificationMessage>> notifier) {
+        final Metrics metrics = new Metrics();
+        try {
+            metrics.totalMessages = messages.size();
+
+            final Map<Boolean, List<HookNotification.HookNotificationMessage>> 
createAndOthers = messages.stream().collect(groupingBy(msg -> 
ENTITY_CREATE.equals(msg.getType())));
+
+            final List<HookNotification.HookNotificationMessage> creates = 
safeGet(createAndOthers, true);
+            metrics.createMessages = creates.size();
+
+            final Map<Boolean, List<Referenceable>> 
newFlowPathsAndOtherEntities = creates.stream()
+                    .flatMap(msg -> ((HookNotification.EntityCreateRequest) 
msg).getEntities().stream())
+                    .collect(groupingBy(ref -> 
TYPE_NIFI_FLOW_PATH.equals(ref.typeName)));
+
+            // Deduplicate same entity creation messages.
+            final List<Referenceable> newEntitiesExceptFlowPaths = 
safeGet(newFlowPathsAndOtherEntities, false)
+                    
.stream().filter(distinctReferenceable()).collect(Collectors.toList());
+
+            // Deduplicate same flow paths and also merge inputs and outputs
+            final Collection<Referenceable> newFlowPaths = 
safeGet(newFlowPathsAndOtherEntities, true).stream()
+                    .collect(toMap(ref -> ref.get(ATTR_QUALIFIED_NAME), ref -> 
ref, (r1, r2) -> {
+                        // Merge inputs and outputs.
+                        mergeRefs(r1, r2);
+                        return r1;
+                    })).values();
+            metrics.uniqueFlowPathCreates = newFlowPaths.size();
+            metrics.uniqueOtherCreates = newEntitiesExceptFlowPaths.size();
+
+
+            // 1-1. Notify new entities except 'nifi_flow_path'
+            // 1-2. Notify new 'nifi_flow_path'
+            List<Referenceable> newEntities = new ArrayList<>();
+            newEntities.addAll(newEntitiesExceptFlowPaths);
+            newEntities.addAll(newFlowPaths);
+            if (!newEntities.isEmpty()) {
+                notifier.accept(Collections.singletonList(new 
HookNotification.EntityCreateRequest(NIFI_USER, newEntities)));
+            }
+
+            final Map<Boolean, List<HookNotification.HookNotificationMessage>> 
partialNiFiFlowPathUpdateAndOthers
+                    = safeGet(createAndOthers, 
false).stream().collect(groupingBy(msg
+                    -> ENTITY_PARTIAL_UPDATE.equals(msg.getType())
+                    && 
TYPE_NIFI_FLOW_PATH.equals(((HookNotification.EntityPartialUpdateRequest)msg).getTypeName())
+                    && 
ATTR_QUALIFIED_NAME.equals(((HookNotification.EntityPartialUpdateRequest)msg).getAttribute())
+            ));
+
+
+            // These updates are made against existing flow path entities.
+            final List<HookNotification.HookNotificationMessage> 
partialNiFiFlowPathUpdates = safeGet(partialNiFiFlowPathUpdateAndOthers, true);
+            final List<HookNotification.HookNotificationMessage> otherMessages 
= safeGet(partialNiFiFlowPathUpdateAndOthers, false);
+            metrics.partialNiFiFlowPathUpdates = 
partialNiFiFlowPathUpdates.size();
+            metrics.otherMessages = otherMessages.size();
+
+
+            // 2. Notify de-duplicated 'nifi_flow_path' updates
+            final List<HookNotification.HookNotificationMessage> 
deduplicatedMessages = partialNiFiFlowPathUpdates.stream().map(msg -> 
(HookNotification.EntityPartialUpdateRequest) msg)
+                    // Group by nifi_flow_path qualifiedName value.
+                    
.collect(groupingBy(HookNotification.EntityPartialUpdateRequest::getAttributeValue)).entrySet().stream()
+                    .map(entry -> {
+                        final String flowPathQualifiedName = entry.getKey();
+                        final Map<String, Referenceable> distinctInputs;
+                        final Map<String, Referenceable> distinctOutputs;
+                        final String flowPathGuid;
+                        try {
+                            // Fetch existing nifi_flow_path and its 
inputs/ouputs.
+                            metrics.flowPathSearched++;
+                            final AtlasEntity.AtlasEntityWithExtInfo 
flowPathExt
+                                    = atlasClient.searchEntityDef(new 
AtlasObjectId(TYPE_NIFI_FLOW_PATH, ATTR_QUALIFIED_NAME, flowPathQualifiedName));
+                            final AtlasEntity flowPathEntity = 
flowPathExt.getEntity();
+                            flowPathGuid = flowPathEntity.getGuid();
+                            distinctInputs = 
toReferenceables(flowPathEntity.getAttribute(ATTR_INPUTS), metrics);
+                            distinctOutputs = 
toReferenceables(flowPathEntity.getAttribute(ATTR_OUTPUTS), metrics);
+
+                        } catch (AtlasServiceException e) {
+                            if 
(ClientResponse.Status.NOT_FOUND.equals(e.getStatus())) {
+                                logger.debug("nifi_flow_path was not found for 
qualifiedName {}", flowPathQualifiedName);
+                            } else {
+                                logger.warn("Failed to retrieve nifi_flow_path 
with qualifiedName {} due to {}", flowPathQualifiedName, e, e);
+                            }
+                            return null;
+                        }
+
+                        // Merge all inputs and outputs for this 
nifi_flow_path.
+                        for (HookNotification.EntityPartialUpdateRequest msg : 
entry.getValue()) {
+                            
fromReferenceable(msg.getEntity().get(ATTR_INPUTS), metrics)
+                                    .entrySet().stream().filter(ref -> 
!distinctInputs.containsKey(ref.getKey()))
+                                    .forEach(ref -> 
distinctInputs.put(ref.getKey(), ref.getValue()));
+
+                            
fromReferenceable(msg.getEntity().get(ATTR_OUTPUTS), metrics)
+                                    .entrySet().stream().filter(ref -> 
!distinctOutputs.containsKey(ref.getKey()))
+                                    .forEach(ref -> 
distinctOutputs.put(ref.getKey(), ref.getValue()));
+                        }
+
+                        // Consolidate messages into one.
+                        final Referenceable flowPathRef = new 
Referenceable(flowPathGuid, TYPE_NIFI_FLOW_PATH, null);
+                        // NOTE: distinctInputs.values() returns 
HashMap$Values, which causes following error. To avoid that, wrap with 
ArrayList:
+                        // org.json4s.package$MappingException: Can't find 
ScalaSig for class org.apache.atlas.typesystem.Referenceable
+                        flowPathRef.set(ATTR_INPUTS, new 
ArrayList<>(distinctInputs.values()));
+                        flowPathRef.set(ATTR_OUTPUTS, new 
ArrayList<>(distinctOutputs.values()));
+                        return new 
HookNotification.EntityPartialUpdateRequest(NIFI_USER, TYPE_NIFI_FLOW_PATH,
+                                ATTR_QUALIFIED_NAME, flowPathQualifiedName, 
flowPathRef);
+                    })
+                    .filter(Objects::nonNull)
+                    .collect(Collectors.toList());
+
+            metrics.uniquePartialNiFiFlowPathUpdates = 
deduplicatedMessages.size();
+            notifier.accept(deduplicatedMessages);
+
+            // 3. Notify other messages
+            notifier.accept(otherMessages);
+
+        } finally {
+            logger.info(metrics.toLogString("Finished"));
+        }
+
+    }
+
+    /**
+     * <p>Convert nifi_flow_path inputs or outputs to a map of Referenceable 
keyed by qualifiedName.</p>
+     * <p>Atlas removes existing references those are not specified when a 
collection attribute is updated.
+     * In order to preserve existing DataSet references, existing elements 
should be passed within a partial update message.</p>
+     * <p>This method also populates entity cache for subsequent lookups.</p>
+     * @param _refs Contains references from an existin nifi_flow_path entity 
inputs or outputs attribute.
+     * @return A map of Referenceables keyed by qualifiedName.
+     */
+    @SuppressWarnings("unchecked")
+    private Map<String, Referenceable> toReferenceables(Object _refs, Metrics 
metrics) {
+        if (_refs == null) {
+            // NOTE: This empty map may be used to add new Referenceables. Can 
not be Collection.emptyMap which does not support addition.
+            return new HashMap<>();
+        }
+
+        final Collection<Map<String, Object>> refs = (Collection<Map<String, 
Object>>) _refs;
+        return refs.stream().map(ref -> {
+            // Existing reference should has a GUID.
+            final String typeName = (String) ref.get(ATTR_TYPENAME);
+            final String guid = (String) ref.get(ATTR_GUID);
+
+            if (guidToQualifiedName.containsKey(guid)) {
+                metrics.dataSetCacheHit++;
+            }
+
+            final String refQualifiedName = 
guidToQualifiedName.computeIfAbsent(guid, k -> {
+                try {
+                    metrics.dataSetSearched++;
+                    final AtlasEntity.AtlasEntityWithExtInfo refExt = 
atlasClient.searchEntityDef(new AtlasObjectId(guid, typeName));
+                    final String qualifiedName = (String) 
refExt.getEntity().getAttribute(ATTR_QUALIFIED_NAME);
+                    typedQualifiedNameToRef.put(toTypedQualifiedName(typeName, 
qualifiedName), new Referenceable(guid, typeName, Collections.EMPTY_MAP));
+                    return qualifiedName;
+                } catch (AtlasServiceException e) {
+                    if (ClientResponse.Status.NOT_FOUND.equals(e.getStatus())) 
{
+                        logger.warn("{} entity was not found for guid {}", 
typeName, guid);
+                    } else {
+                        logger.warn("Failed to retrieve {} with guid {} due to 
{}", typeName, guid, e);
+                    }
+                    return null;
+                }
+            });
+
+            if (refQualifiedName == null) {
+                return null;
+            }
+            return new Tuple<>(refQualifiedName, 
typedQualifiedNameToRef.get(toTypedQualifiedName(typeName, refQualifiedName)));
+        }).filter(Objects::nonNull).filter(tuple -> tuple.getValue() != null)
+                // If duplication happens, use new value.
+                .collect(toMap(Tuple::getKey, Tuple::getValue, (oldValue, 
newValue) -> {
+                    logger.debug("Duplicated qualified name was found, use the 
new one. oldValue={}, newValue={}", new Object[]{oldValue, newValue});
+                    return newValue;
+                }));
+    }
+
+    @SuppressWarnings("unchecked")
+    private Map<String, Referenceable> fromReferenceable(Object _refs, Metrics 
metrics) {
+        if (_refs == null) {
+            return Collections.emptyMap();
+        }
+
+        final Collection<Referenceable> refs = (Collection<Referenceable>) 
_refs;
+        return refs.stream().map(ref -> {
+            // This ref is created within this reporting cycle, and it may not 
have GUID assigned yet, if it is a brand new reference.
+            // If cache has the Reference, then use it because instances in 
the cache are guaranteed to have GUID assigned.
+            // Brand new Referenceables have to have all mandatory attributes.
+            final String typeName = ref.getTypeName();
+            final Id id = ref.getId();
+            final String refQualifiedName = (String) 
ref.get(ATTR_QUALIFIED_NAME);
+            final String typedRefQualifiedName = 
toTypedQualifiedName(typeName, refQualifiedName);
+
+            final Referenceable refFromCacheIfAvailable = 
typedQualifiedNameToRef.computeIfAbsent(typedRefQualifiedName, k -> {
+                if (id.isAssigned()) {
+                    // If this referenceable has Guid assigned, then add this 
one to cache.
+                    guidToQualifiedName.put(id._getId(), refQualifiedName);
+                    typedQualifiedNameToRef.put(typedRefQualifiedName, ref);
+                }
+                return ref;
+            });
+
+            return new Tuple<>(refQualifiedName, refFromCacheIfAvailable);
+        }).filter(tuple -> tuple.getValue() != null)
+                .collect(toMap(Tuple::getKey, Tuple::getValue));
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/79a7014a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/AbstractLineageStrategy.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/AbstractLineageStrategy.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/AbstractLineageStrategy.java
index 11d6e8b..a253c7d 100644
--- 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/AbstractLineageStrategy.java
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/AbstractLineageStrategy.java
@@ -40,7 +40,7 @@ import java.util.stream.Collectors;
 
 import static org.apache.nifi.atlas.AtlasUtils.toStr;
 import static org.apache.nifi.atlas.AtlasUtils.toTypedQualifiedName;
-import static org.apache.nifi.atlas.NiFiAtlasHook.NIFI_USER;
+import static org.apache.nifi.atlas.hook.NiFiAtlasHook.NIFI_USER;
 import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
 import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
 import static org.apache.nifi.atlas.NiFiTypes.ATTR_NIFI_FLOW;
@@ -136,7 +136,7 @@ public abstract class AbstractLineageStrategy implements 
LineageStrategy {
     }
 
     @SuppressWarnings("unchecked")
-    private boolean addDataSetRefs(Set<Referenceable> refsToAdd, Referenceable 
nifiFlowPath, String targetAttribute) {
+    protected boolean addDataSetRefs(Set<Referenceable> refsToAdd, 
Referenceable nifiFlowPath, String targetAttribute) {
         if (refsToAdd != null && !refsToAdd.isEmpty()) {
 
             // If nifiFlowPath already has a given dataSetRef, then it needs 
not to be created.

http://git-wip-us.apache.org/repos/asf/nifi/blob/79a7014a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/CompleteFlowPathLineage.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/CompleteFlowPathLineage.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/CompleteFlowPathLineage.java
index 4437bfc..d3ac658 100644
--- 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/CompleteFlowPathLineage.java
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/CompleteFlowPathLineage.java
@@ -42,7 +42,9 @@ import java.util.zip.CRC32;
 import static org.apache.nifi.atlas.AtlasUtils.toQualifiedName;
 import static org.apache.nifi.atlas.AtlasUtils.toStr;
 import static org.apache.nifi.atlas.AtlasUtils.toTypedQualifiedName;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
 import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS;
 import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
 import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE;
 import static org.apache.nifi.provenance.ProvenanceEventType.DROP;
@@ -76,8 +78,36 @@ public class CompleteFlowPathLineage extends 
AbstractLineageStrategy {
             createCompleteFlowPath(nifiFlow, lineagePath, createdFlowPaths);
             for (Tuple<NiFiFlowPath, DataSetRefs> createdFlowPath : 
createdFlowPaths) {
                 final NiFiFlowPath flowPath = createdFlowPath.getKey();
-                createEntity(toReferenceable(flowPath, nifiFlow));
+                // NOTE 1: FlowPath creation and DataSet references should be 
reported separately
+                // 
------------------------------------------------------------------------------
+                // For example, with following provenance event inputs:
+                //   CREATE(F1), FORK (F1 -> F2, F3), DROP(F1), SEND (F2), 
SEND(F3), DROP(F2), DROP(F3),
+                // there is no guarantee that DROP(F2) and DROP(F3) are 
processed within the same cycle.
+                // If DROP(F3) is processed in different cycle, it needs to be 
added to the existing FlowPath
+                // that contains F1 -> F2, to be F1 -> F2, F3.
+                // Execution cycle 1: Path1 (source of F1 -> ForkA), 
ForkA_queue (F1 -> F2), Path2 (ForkA -> dest of F2)
+                // Execution cycle 2: Path1 (source of F1 -> ForkB), 
ForkB_queue (F1 -> F3), Path3 (ForkB -> dest of F3)
+
+                // NOTE 2: Both FlowPath creation and FlowPath update messages 
are required
+                // 
------------------------------------------------------------------------
+                // For the 1st time when a lineage is found, nifi_flow_path 
and referred DataSets are created.
+                // If we notify these entities by a create 3 entities message 
(Path1, DataSet1, DataSet2)
+                // followed by 1 partial update message to add lineage 
(DataSet1 -> Path1 -> DataSet2), then
+                // the update message may arrive at Atlas earlier than the 
create message gets processed.
+                // If that happens, lineage among these entities will be 
missed.
+                // But as written in NOTE1, there is a case where existing 
nifi_flow_paths need to be updated.
+                // Also, we don't know if this is the 1st time or 2nd or later.
+                // So, we need to notify entity creation and also partial 
update messages.
+
+                // Create flow path entity with DataSet refs.
+                final Referenceable flowPathRef = toReferenceable(flowPath, 
nifiFlow);
+                final DataSetRefs dataSetRefs = createdFlowPath.getValue();
+                addDataSetRefs(dataSetRefs.getInputs(), flowPathRef, 
ATTR_INPUTS);
+                addDataSetRefs(dataSetRefs.getOutputs(), flowPathRef, 
ATTR_OUTPUTS);
+                createEntity(flowPathRef);
+                // Also, sending partial update message to update existing 
flow_path.
                 addDataSetRefs(nifiFlow, Collections.singleton(flowPath), 
createdFlowPath.getValue());
+
             }
             createdFlowPaths.clear();
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/79a7014a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
index 29ae013..29926ba 100644
--- 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
@@ -56,8 +56,8 @@ import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.atlas.hook.NiFiAtlasHook;
 import org.apache.nifi.atlas.NiFiAtlasClient;
-import org.apache.nifi.atlas.NiFiAtlasHook;
 import org.apache.nifi.atlas.NiFiFlow;
 import org.apache.nifi.atlas.NiFiFlowAnalyzer;
 import org.apache.nifi.atlas.provenance.AnalysisContext;

http://git-wip-us.apache.org/repos/asf/nifi/blob/79a7014a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/hook/TestNotificationSender.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/hook/TestNotificationSender.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/hook/TestNotificationSender.java
new file mode 100644
index 0000000..8b356e1
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/hook/TestNotificationSender.java
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.hook;
+
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.notification.hook.HookNotification;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.nifi.atlas.AtlasUtils;
+import org.apache.nifi.atlas.NiFiAtlasClient;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.singleton;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_DATASET;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE;
+import static org.apache.nifi.atlas.hook.NiFiAtlasHook.NIFI_USER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestNotificationSender {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(TestNotificationSender.class);
+
+    private static class Notifier implements 
Consumer<List<HookNotification.HookNotificationMessage>> {
+        private final List<List<HookNotification.HookNotificationMessage>> 
notifications = new ArrayList<>();
+        @Override
+        public void accept(List<HookNotification.HookNotificationMessage> 
messages) {
+            logger.info("notified at {}, {}", notifications.size(), messages);
+            notifications.add(messages);
+        }
+    }
+
+    @Test
+    public void testZeroMessage() {
+        final NotificationSender sender = new NotificationSender();
+        final List<HookNotification.HookNotificationMessage> messages = 
Collections.emptyList();
+        final Notifier notifier = new Notifier();
+        sender.send(messages, notifier);
+        assertEquals(0, notifier.notifications.get(0).size());
+        assertEquals(0, notifier.notifications.get(1).size());
+    }
+
+    private Referenceable createRef(String type, String qname) {
+        final Referenceable ref = new Referenceable(type);
+        ref.set(ATTR_QUALIFIED_NAME, qname);
+        return ref;
+    }
+
+    @SuppressWarnings("unchecked")
+    private void assertCreateMessage(Notifier notifier, int notificationIndex, 
Referenceable ... expects) {
+        assertTrue(notifier.notifications.size() > notificationIndex);
+        final List<HookNotification.HookNotificationMessage> messages = 
notifier.notifications.get(notificationIndex);
+        assertEquals(1, messages.size());
+        final HookNotification.EntityCreateRequest message = 
(HookNotification.EntityCreateRequest) messages.get(0);
+        assertEquals(expects.length, message.getEntities().size());
+
+        // The use of 'flatMap' at NotificationSender does not preserve actual 
entities order.
+        // Use typed qname map to assert regardless of ordering.
+        final Map<String, Referenceable> entities = 
message.getEntities().stream().collect(Collectors.toMap(
+                ref -> AtlasUtils.toTypedQualifiedName(ref.getTypeName(), 
(String) ref.get(ATTR_QUALIFIED_NAME)), ref -> ref));
+
+        boolean hasFlowPathSeen = false;
+        for (int i = 0; i < expects.length; i++) {
+            final Referenceable expect = expects[i];
+            final String typeName = expect.getTypeName();
+            final Referenceable actual = 
entities.get(AtlasUtils.toTypedQualifiedName(typeName, (String) 
expect.get(ATTR_QUALIFIED_NAME)));
+            assertNotNull(actual);
+            assertEquals(typeName, actual.getTypeName());
+            assertEquals(expect.get(ATTR_QUALIFIED_NAME), 
actual.get(ATTR_QUALIFIED_NAME));
+
+            if (TYPE_NIFI_FLOW_PATH.equals(typeName)) {
+                assertIOReferences(expect, actual, ATTR_INPUTS);
+                assertIOReferences(expect, actual, ATTR_OUTPUTS);
+                hasFlowPathSeen = true;
+            } else {
+                assertFalse("Types other than nifi_flow_path should be created 
before any nifi_flow_path entity.", hasFlowPathSeen);
+            }
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private void assertIOReferences(Referenceable expect, Referenceable 
actual, String attrName) {
+        final Collection<Referenceable> expectedRefs = 
(Collection<Referenceable>) expect.get(attrName);
+        if (expectedRefs != null) {
+            final Collection<Referenceable> actualRefs = 
(Collection<Referenceable>) actual.get(attrName);
+            assertEquals(expectedRefs.size(), actualRefs.size());
+            final Iterator<Referenceable> actualIterator = 
actualRefs.iterator();
+            for (Referenceable expectedRef : expectedRefs) {
+                final Referenceable actualRef = actualIterator.next();
+                assertEquals(expectedRef.getTypeName(), 
actualRef.getTypeName());
+                assertEquals(expectedRef.get(ATTR_QUALIFIED_NAME), 
actualRef.get(ATTR_QUALIFIED_NAME));
+            }
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private void assertUpdateFlowPathMessage(Notifier notifier, int 
notificationIndex, Referenceable ... expects) {
+        assertTrue(notifier.notifications.size() > notificationIndex);
+        final List<HookNotification.HookNotificationMessage> messages = 
notifier.notifications.get(notificationIndex);
+        assertEquals(expects.length, messages.size());
+        for (int i = 0; i < expects.length; i++) {
+            final Referenceable expect = expects[i];
+            final HookNotification.EntityPartialUpdateRequest actual = 
(HookNotification.EntityPartialUpdateRequest) messages.get(i);
+            assertEquals(expect.getTypeName(), actual.getTypeName());
+            assertEquals(ATTR_QUALIFIED_NAME, actual.getAttribute());
+            assertEquals(expect.get(ATTR_QUALIFIED_NAME), 
actual.getAttributeValue());
+
+            final Collection expIn = (Collection) expect.get(ATTR_INPUTS);
+            final Collection expOut = (Collection) expect.get(ATTR_OUTPUTS);
+            assertTrue(expIn.containsAll((Collection) 
actual.getEntity().get(ATTR_INPUTS)));
+            assertTrue(expOut.containsAll((Collection) 
actual.getEntity().get(ATTR_OUTPUTS)));
+        }
+    }
+
+    @Test
+    public void testOneCreateDataSetMessage() {
+        final NotificationSender sender = new NotificationSender();
+        final Referenceable queue1 = createRef(TYPE_NIFI_QUEUE, "queue1@test");
+        final List<HookNotification.HookNotificationMessage> messages = 
Collections.singletonList(
+                new HookNotification.EntityCreateRequest(NIFI_USER, queue1));
+        final Notifier notifier = new Notifier();
+        sender.send(messages, notifier);
+
+        // EntityCreateRequest containing nifi_queue.
+        assertCreateMessage(notifier, 0, queue1);
+    }
+
+    @Test
+    public void testCreateDataSetMessageDeduplication() {
+        // Simulating complete path, that there were 4 FlowFiles went through 
the same partial flow.
+        // FF1 was ingested from data1, and so was FF2 from data2.
+        // Then FF1 was FORKed to FF11 and FF12, then sent to data11 and 
data12.
+        // Similarly FF2 was FORKed to FF21 and FF22, then sent to data21 and 
data22.
+        // FF3 went through the same as FF1, so did FF4 as FF2.
+        // All of those provenance events were processed within a single 
ReportLineageToAtlas cycle.
+
+        // FF1: data1 -> pathA1 -> FORK11 (FF11) -> pathB11 -> data11
+        //                      -> FORK12 (FF12) -> pathB12 -> data12
+        // FF2: data2 -> pathA2 -> FORK21 (FF21) -> pathB21 -> data21
+        //                      -> FORK22 (FF22) -> pathB22 -> data22
+        // FF3: data1 -> pathA1 -> FORK11 (FF31) -> pathB11 -> data11
+        //                      -> FORK12 (FF32) -> pathB12 -> data12
+        // FF4: data2 -> pathA2 -> FORK21 (FF41) -> pathB21 -> data21
+        //                      -> FORK22 (FF42) -> pathB22 -> data22
+
+        // As a result, following lineages are reported to Atlas:
+        // data1 -> pathA1 -> FORK11 -> pathB11 -> data11
+        //                 -> FORK12 -> pathB12 -> data12
+        // data2 -> pathA2 -> FORK21 -> pathB21 -> data21
+        //                 -> FORK22 -> pathB22 -> data22
+
+        final NotificationSender sender = new NotificationSender();
+
+        // From FF1
+        final Referenceable ff1_data1 = createRef(TYPE_DATASET, "data1@test");
+        final Referenceable ff1_data11 = createRef(TYPE_DATASET, 
"data11@test");
+        final Referenceable ff1_data12 = createRef(TYPE_DATASET, 
"data12@test");
+        final Referenceable ff1_pathA11 = createRef(TYPE_NIFI_FLOW_PATH, 
"A::0001@test");
+        final Referenceable ff1_pathA12 = createRef(TYPE_NIFI_FLOW_PATH, 
"A::0001@test");
+        final Referenceable ff1_fork11 = createRef(TYPE_NIFI_QUEUE, 
"B::0011@test");
+        final Referenceable ff1_fork12 = createRef(TYPE_NIFI_QUEUE, 
"B::0012@test");
+        final Referenceable ff1_pathB11 = createRef(TYPE_NIFI_FLOW_PATH, 
"B::0011@test");
+        final Referenceable ff1_pathB12 = createRef(TYPE_NIFI_FLOW_PATH, 
"B::0012@test");
+        // From FF11
+        ff1_pathA11.set(ATTR_INPUTS, singleton(ff1_data1));
+        ff1_pathA11.set(ATTR_OUTPUTS, singleton(ff1_fork11));
+        ff1_pathB11.set(ATTR_INPUTS, singleton(ff1_fork11));
+        ff1_pathB11.set(ATTR_OUTPUTS, singleton(ff1_data11));
+        // From FF12
+        ff1_pathA12.set(ATTR_INPUTS, singleton(ff1_data1));
+        ff1_pathA12.set(ATTR_OUTPUTS, singleton(ff1_fork12));
+        ff1_pathB12.set(ATTR_INPUTS, singleton(ff1_fork12));
+        ff1_pathB12.set(ATTR_OUTPUTS, singleton(ff1_data12));
+
+        // From FF2
+        final Referenceable ff2_data2 = createRef(TYPE_DATASET, "data2@test");
+        final Referenceable ff2_data21 = createRef(TYPE_DATASET, 
"data21@test");
+        final Referenceable ff2_data22 = createRef(TYPE_DATASET, 
"data22@test");
+        final Referenceable ff2_pathA21 = createRef(TYPE_NIFI_FLOW_PATH, 
"A::0002@test");
+        final Referenceable ff2_pathA22 = createRef(TYPE_NIFI_FLOW_PATH, 
"A::0002@test");
+        final Referenceable ff2_fork21 = createRef(TYPE_NIFI_QUEUE, 
"B::0021@test");
+        final Referenceable ff2_fork22 = createRef(TYPE_NIFI_QUEUE, 
"B::0022@test");
+        final Referenceable ff2_pathB21 = createRef(TYPE_NIFI_FLOW_PATH, 
"B::0021@test");
+        final Referenceable ff2_pathB22 = createRef(TYPE_NIFI_FLOW_PATH, 
"B::0022@test");
+        // From FF21
+        ff2_pathA21.set(ATTR_INPUTS, singleton(ff2_data2));
+        ff2_pathA21.set(ATTR_OUTPUTS, singleton(ff2_fork21));
+        ff2_pathB21.set(ATTR_INPUTS, singleton(ff2_fork21));
+        ff2_pathB21.set(ATTR_OUTPUTS, singleton(ff2_data21));
+        // From FF22
+        ff2_pathA22.set(ATTR_INPUTS, singleton(ff2_data2));
+        ff2_pathA22.set(ATTR_OUTPUTS, singleton(ff2_fork22));
+        ff2_pathB22.set(ATTR_INPUTS, singleton(ff2_fork22));
+        ff2_pathB22.set(ATTR_OUTPUTS, singleton(ff2_data22));
+
+        // From FF3
+        final Referenceable ff3_data1 = createRef(TYPE_DATASET, "data1@test");
+        final Referenceable ff3_data11 = createRef(TYPE_DATASET, 
"data11@test");
+        final Referenceable ff3_data12 = createRef(TYPE_DATASET, 
"data12@test");
+        final Referenceable ff3_pathA11 = createRef(TYPE_NIFI_FLOW_PATH, 
"A::0001@test");
+        final Referenceable ff3_pathA12 = createRef(TYPE_NIFI_FLOW_PATH, 
"A::0001@test");
+        final Referenceable ff3_fork11 = createRef(TYPE_NIFI_QUEUE, 
"B::0011@test");
+        final Referenceable ff3_fork12 = createRef(TYPE_NIFI_QUEUE, 
"B::0012@test");
+        final Referenceable ff3_pathB11 = createRef(TYPE_NIFI_FLOW_PATH, 
"B::0011@test");
+        final Referenceable ff3_pathB12 = createRef(TYPE_NIFI_FLOW_PATH, 
"B::0012@test");
+        // From FF31
+        ff3_pathA11.set(ATTR_INPUTS, singleton(ff3_data1));
+        ff3_pathA11.set(ATTR_OUTPUTS, singleton(ff3_fork11));
+        ff3_pathB11.set(ATTR_INPUTS, singleton(ff3_fork11));
+        ff3_pathB11.set(ATTR_OUTPUTS, singleton(ff3_data11));
+        // From FF32
+        ff3_pathA12.set(ATTR_INPUTS, singleton(ff3_data1));
+        ff3_pathA12.set(ATTR_OUTPUTS, singleton(ff3_fork12));
+        ff3_pathB12.set(ATTR_INPUTS, singleton(ff3_fork12));
+        ff3_pathB12.set(ATTR_OUTPUTS, singleton(ff3_data12));
+
+        // From FF4
+        final Referenceable ff4_data2 = createRef(TYPE_DATASET, "data2@test");
+        final Referenceable ff4_data21 = createRef(TYPE_DATASET, 
"data21@test");
+        final Referenceable ff4_data22 = createRef(TYPE_DATASET, 
"data22@test");
+        final Referenceable ff4_pathA21 = createRef(TYPE_NIFI_FLOW_PATH, 
"A::0002@test");
+        final Referenceable ff4_pathA22 = createRef(TYPE_NIFI_FLOW_PATH, 
"A::0002@test");
+        final Referenceable ff4_fork21 = createRef(TYPE_NIFI_QUEUE, 
"B::0021@test");
+        final Referenceable ff4_fork22 = createRef(TYPE_NIFI_QUEUE, 
"B::0022@test");
+        final Referenceable ff4_pathB21 = createRef(TYPE_NIFI_FLOW_PATH, 
"B::0021@test");
+        final Referenceable ff4_pathB22 = createRef(TYPE_NIFI_FLOW_PATH, 
"B::0022@test");
+        // From FF41
+        ff4_pathA21.set(ATTR_INPUTS, singleton(ff4_data2));
+        ff4_pathA21.set(ATTR_OUTPUTS, singleton(ff4_fork21));
+        ff4_pathB21.set(ATTR_INPUTS, singleton(ff4_fork21));
+        ff4_pathB21.set(ATTR_OUTPUTS, singleton(ff4_data21));
+        // From FF42
+        ff4_pathA22.set(ATTR_INPUTS, singleton(ff4_data2));
+        ff4_pathA22.set(ATTR_OUTPUTS, singleton(ff4_fork22));
+        ff4_pathB22.set(ATTR_INPUTS, singleton(ff4_fork22));
+        ff4_pathB22.set(ATTR_OUTPUTS, singleton(ff4_data22));
+
+
+        final List<HookNotification.HookNotificationMessage> messages = asList(
+                new HookNotification.EntityCreateRequest(NIFI_USER, ff1_data1),
+                new HookNotification.EntityCreateRequest(NIFI_USER, 
ff1_data11),
+                new HookNotification.EntityCreateRequest(NIFI_USER, 
ff1_data12),
+                new HookNotification.EntityCreateRequest(NIFI_USER, 
ff1_pathA11),
+                new HookNotification.EntityCreateRequest(NIFI_USER, 
ff1_pathA12),
+                new HookNotification.EntityCreateRequest(NIFI_USER, 
ff1_fork11),
+                new HookNotification.EntityCreateRequest(NIFI_USER, 
ff1_fork12),
+                new HookNotification.EntityCreateRequest(NIFI_USER, 
ff1_pathB11),
+                new HookNotification.EntityCreateRequest(NIFI_USER, 
ff1_pathB12),
+
+                new HookNotification.EntityCreateRequest(NIFI_USER, ff2_data2),
+                new HookNotification.EntityCreateRequest(NIFI_USER, 
ff2_data21),
+                new HookNotification.EntityCreateRequest(NIFI_USER, 
ff2_data22),
+                new HookNotification.EntityCreateRequest(NIFI_USER, 
ff2_pathA21),
+                new HookNotification.EntityCreateRequest(NIFI_USER, 
ff2_pathA22),
+                new HookNotification.EntityCreateRequest(NIFI_USER, 
ff2_fork21),
+                new HookNotification.EntityCreateRequest(NIFI_USER, 
ff2_fork22),
+                new HookNotification.EntityCreateRequest(NIFI_USER, 
ff2_pathB21),
+                new HookNotification.EntityCreateRequest(NIFI_USER, 
ff2_pathB22),
+
+                new HookNotification.EntityCreateRequest(NIFI_USER, ff3_data1),
+                new HookNotification.EntityCreateRequest(NIFI_USER, 
ff3_data11),
+                new HookNotification.EntityCreateRequest(NIFI_USER, 
ff3_data12),
+                new HookNotification.EntityCreateRequest(NIFI_USER, 
ff3_pathA11),
+                new HookNotification.EntityCreateRequest(NIFI_USER, 
ff3_pathA12),
+                new HookNotification.EntityCreateRequest(NIFI_USER, 
ff3_fork11),
+                new HookNotification.EntityCreateRequest(NIFI_USER, 
ff3_fork12),
+                new HookNotification.EntityCreateRequest(NIFI_USER, 
ff3_pathB11),
+                new HookNotification.EntityCreateRequest(NIFI_USER, 
ff3_pathB12),
+
+                new HookNotification.EntityCreateRequest(NIFI_USER, ff4_data2),
+                new HookNotification.EntityCreateRequest(NIFI_USER, 
ff4_data21),
+                new HookNotification.EntityCreateRequest(NIFI_USER, 
ff4_data22),
+                new HookNotification.EntityCreateRequest(NIFI_USER, 
ff4_pathA21),
+                new HookNotification.EntityCreateRequest(NIFI_USER, 
ff4_pathA22),
+                new HookNotification.EntityCreateRequest(NIFI_USER, 
ff4_fork21),
+                new HookNotification.EntityCreateRequest(NIFI_USER, 
ff4_fork22),
+                new HookNotification.EntityCreateRequest(NIFI_USER, 
ff4_pathB21),
+                new HookNotification.EntityCreateRequest(NIFI_USER, 
ff4_pathB22)
+        );
+
+        final Notifier notifier = new Notifier();
+        sender.send(messages, notifier);
+
+        // EntityCreateRequest, same entities get de-duplicated. 
nifi_flow_path is created after other types.
+        final Referenceable r_data1 = createRef(TYPE_DATASET, "data1@test");
+        final Referenceable r_data11 = createRef(TYPE_DATASET, "data11@test");
+        final Referenceable r_data12 = createRef(TYPE_DATASET, "data12@test");
+        final Referenceable r_pathA1 = createRef(TYPE_NIFI_FLOW_PATH, 
"A::0001@test");
+        final Referenceable r_fork11 = createRef(TYPE_NIFI_QUEUE, 
"B::0011@test");
+        final Referenceable r_fork12 = createRef(TYPE_NIFI_QUEUE, 
"B::0012@test");
+        final Referenceable r_pathB11 = createRef(TYPE_NIFI_FLOW_PATH, 
"B::0011@test");
+        final Referenceable r_pathB12 = createRef(TYPE_NIFI_FLOW_PATH, 
"B::0012@test");
+        r_pathA1.set(ATTR_INPUTS, singleton(r_data1));
+        r_pathA1.set(ATTR_OUTPUTS, asList(r_fork11, r_fork12));
+        r_pathB11.set(ATTR_INPUTS, singleton(r_fork11));
+        r_pathB11.set(ATTR_OUTPUTS, singleton(r_data11));
+        r_pathB12.set(ATTR_INPUTS, singleton(r_fork12));
+        r_pathB12.set(ATTR_OUTPUTS, singleton(r_data12));
+
+        final Referenceable r_data2 = createRef(TYPE_DATASET, "data2@test");
+        final Referenceable r_data21 = createRef(TYPE_DATASET, "data21@test");
+        final Referenceable r_data22 = createRef(TYPE_DATASET, "data22@test");
+        final Referenceable r_pathA2 = createRef(TYPE_NIFI_FLOW_PATH, 
"A::0002@test");
+        final Referenceable r_fork21 = createRef(TYPE_NIFI_QUEUE, 
"B::0021@test");
+        final Referenceable r_fork22 = createRef(TYPE_NIFI_QUEUE, 
"B::0022@test");
+        final Referenceable r_pathB21 = createRef(TYPE_NIFI_FLOW_PATH, 
"B::0021@test");
+        final Referenceable r_pathB22 = createRef(TYPE_NIFI_FLOW_PATH, 
"B::0022@test");
+        r_pathA2.set(ATTR_INPUTS, singleton(r_data2));
+        r_pathA2.set(ATTR_OUTPUTS, asList(r_fork21, r_fork22));
+        r_pathB21.set(ATTR_INPUTS, singleton(r_fork21));
+        r_pathB21.set(ATTR_OUTPUTS, singleton(r_data21));
+        r_pathB22.set(ATTR_INPUTS, singleton(r_fork22));
+        r_pathB22.set(ATTR_OUTPUTS, singleton(r_data22));
+
+        assertCreateMessage(notifier, 0,
+                r_data1, r_data11, r_data12, r_fork11, r_fork12,
+                r_data2, r_data21, r_data22, r_fork21, r_fork22,
+                r_pathA1, r_pathB11, r_pathB12,
+                r_pathA2, r_pathB21, r_pathB22);
+    }
+
+    private Map<String, String> createGuidReference(String type, String guid) {
+        Map<String, String> map = new HashMap<>();
+        map.put(ATTR_TYPENAME, type);
+        map.put(ATTR_GUID, guid);
+        return map;
+    }
+
+    @Test
+    public void testUpdateFlowPath() throws AtlasServiceException {
+        final NotificationSender sender = new NotificationSender();
+        final Referenceable fileC = createRef("fs_path", 
"/tmp/file-c.txt@test");
+        final Referenceable fileD = createRef("fs_path", 
"/tmp/file-d.txt@test");
+
+        // New in/out fileC and fileD are found for path1.
+        final Referenceable newPath1Lineage = createRef(TYPE_NIFI_FLOW_PATH, 
"path1@test");
+        newPath1Lineage.set(ATTR_INPUTS, singleton(fileC));
+        newPath1Lineage.set(ATTR_OUTPUTS, singleton(fileD));
+
+        final List<HookNotification.HookNotificationMessage> messages = asList(
+                new HookNotification.EntityCreateRequest(NIFI_USER, fileC),
+                new HookNotification.EntityCreateRequest(NIFI_USER, fileD),
+                new HookNotification.EntityPartialUpdateRequest(NIFI_USER, 
TYPE_NIFI_FLOW_PATH, ATTR_QUALIFIED_NAME, "path1@test", newPath1Lineage)
+        );
+
+        final NiFiAtlasClient atlasClient = mock(NiFiAtlasClient.class);
+        sender.setAtlasClient(atlasClient);
+
+        // Existing nifi_flow_path
+        final AtlasEntity path1Entity = new AtlasEntity(TYPE_NIFI_FLOW_PATH, 
ATTR_QUALIFIED_NAME, "path1@test");
+        path1Entity.setGuid("path1-guid");
+        path1Entity.setAttribute(ATTR_INPUTS, 
singleton(createGuidReference("fs_path", "fileA-guid")));
+        path1Entity.setAttribute(ATTR_OUTPUTS, 
singleton(createGuidReference("fs_path", "fileB-guid")));
+
+        final AtlasEntity fileAEntity = new AtlasEntity("fs_path", 
ATTR_QUALIFIED_NAME, "file-a.txt@test");
+        fileAEntity.setGuid("fileA-guid");
+
+        final AtlasEntity fileBEntity = new AtlasEntity("fs_path", 
ATTR_QUALIFIED_NAME, "file-b.txt@test");
+        fileBEntity.setGuid("fileA-guid");
+
+        final AtlasEntity.AtlasEntityWithExtInfo path1Ext = new 
AtlasEntity.AtlasEntityWithExtInfo(path1Entity);
+        final AtlasEntity.AtlasEntityWithExtInfo fileAExt = new 
AtlasEntity.AtlasEntityWithExtInfo(fileAEntity);
+        final AtlasEntity.AtlasEntityWithExtInfo fileBExt = new 
AtlasEntity.AtlasEntityWithExtInfo(fileBEntity);
+        when(atlasClient.searchEntityDef(eq(new 
AtlasObjectId(TYPE_NIFI_FLOW_PATH, 
ATTR_QUALIFIED_NAME,"path1@test")))).thenReturn(path1Ext);
+        when(atlasClient.searchEntityDef(eq(new 
AtlasObjectId("fileA-guid")))).thenReturn(fileAExt);
+        when(atlasClient.searchEntityDef(eq(new 
AtlasObjectId("fileB-guid")))).thenReturn(fileBExt);
+
+        final Notifier notifier = new Notifier();
+        sender.send(messages, notifier);
+
+        assertCreateMessage(notifier, 0, fileC, fileD);
+        final Referenceable updatedPath1 = createRef(TYPE_NIFI_FLOW_PATH, 
"path1@test");
+        updatedPath1.set(ATTR_INPUTS, asList(new Referenceable("fileA-guid", 
"fs_path", Collections.emptyMap()), fileC));
+        updatedPath1.set(ATTR_OUTPUTS, asList(new Referenceable("fileB-guid", 
"fs_path", Collections.emptyMap()), fileD));
+        assertUpdateFlowPathMessage(notifier, 1, updatedPath1);
+    }
+
+}

Reply via email to