This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 599fb98  NIFI-6937 - NotificationSender uses typedQualifiedName 
instead of simple qualifiedName as keys in local maps.
599fb98 is described below

commit 599fb9841579e83f37cb0902ff2124ffda468b98
Author: Tamas Palfy <[email protected]>
AuthorDate: Thu Dec 12 14:46:31 2019 +0100

    NIFI-6937 - NotificationSender uses typedQualifiedName instead of simple 
qualifiedName as keys in local maps.
    
    NIFI-6937 - Fix NotificationSender: typedQualifiedName handling.
    
    This closes #3929.
    
    Signed-off-by: Peter Turcsanyi <[email protected]>
---
 .../apache/nifi/atlas/hook/NotificationSender.java | 26 +++++++++++-----------
 1 file changed, 13 insertions(+), 13 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NotificationSender.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NotificationSender.java
index c06e254..66c21df 100644
--- 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NotificationSender.java
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NotificationSender.java
@@ -70,7 +70,7 @@ class NotificationSender {
     /**
      * An index to resolve a qualifiedName from a GUID.
      */
-    private final Map<String, String> guidToQualifiedName;
+    private final Map<String, String> guidToTypedQualifiedName;
     /**
      * An index to resolve a Referenceable from a typeName::qualifiedName.
      */
@@ -87,7 +87,7 @@ class NotificationSender {
 
     NotificationSender() {
         final int qualifiedNameCacheSize = 10_000;
-        this.guidToQualifiedName = createCache(qualifiedNameCacheSize);
+        this.guidToTypedQualifiedName = createCache(qualifiedNameCacheSize);
 
         final int dataSetRefCacheSize = 1_000;
         this.typedQualifiedNameToRef = createCache(dataSetRefCacheSize);
@@ -129,7 +129,7 @@ class NotificationSender {
                     createMessages, uniqueFlowPathCreates, uniqueOtherCreates,
                     partialNiFiFlowPathUpdates, 
uniquePartialNiFiFlowPathUpdates, otherMessages,
                     flowPathSearched, dataSetSearched, dataSetCacheHit,
-                    guidToQualifiedName.size(), 
typedQualifiedNameToRef.size());
+                    guidToTypedQualifiedName.size(), 
typedQualifiedNameToRef.size());
         }
     }
 
@@ -309,7 +309,7 @@ class NotificationSender {
     }
 
     /**
-     * <p>Convert nifi_flow_path inputs or outputs to a map of Referenceable 
keyed by qualifiedName.</p>
+     * <p>Convert nifi_flow_path inputs or outputs to a map of Referenceable 
keyed by type + qualifiedName.</p>
      * <p>Atlas removes existing references those are not specified when a 
collection attribute is updated.
      * In order to preserve existing DataSet references, existing elements 
should be passed within a partial update message.</p>
      * <p>This method also populates entity cache for subsequent lookups.</p>
@@ -329,17 +329,18 @@ class NotificationSender {
             final String typeName = (String) ref.get(ATTR_TYPENAME);
             final String guid = (String) ref.get(ATTR_GUID);
 
-            if (guidToQualifiedName.containsKey(guid)) {
+            if (guidToTypedQualifiedName.containsKey(guid)) {
                 metrics.dataSetCacheHit++;
             }
 
-            final String refQualifiedName = 
guidToQualifiedName.computeIfAbsent(guid, k -> {
+            final String typedQualifiedName = 
guidToTypedQualifiedName.computeIfAbsent(guid, k -> {
                 try {
                     metrics.dataSetSearched++;
                     final AtlasEntity.AtlasEntityWithExtInfo refExt = 
atlasClient.searchEntityDef(new AtlasObjectId(guid, typeName));
                     final String qualifiedName = (String) 
refExt.getEntity().getAttribute(ATTR_QUALIFIED_NAME);
-                    typedQualifiedNameToRef.put(toTypedQualifiedName(typeName, 
qualifiedName), new Referenceable(guid, typeName, Collections.EMPTY_MAP));
-                    return qualifiedName;
+                    String _typedQualifiedName = 
toTypedQualifiedName(typeName, qualifiedName);
+                    typedQualifiedNameToRef.put(_typedQualifiedName, new 
Referenceable(guid, typeName, Collections.EMPTY_MAP));
+                    return _typedQualifiedName;
                 } catch (AtlasServiceException e) {
                     if (ClientResponse.Status.NOT_FOUND.equals(e.getStatus())) 
{
                         logger.warn("{} entity was not found for guid {}", 
typeName, guid);
@@ -350,10 +351,10 @@ class NotificationSender {
                 }
             });
 
-            if (refQualifiedName == null) {
+            if (typedQualifiedName == null) {
                 return null;
             }
-            return new Tuple<>(refQualifiedName, 
typedQualifiedNameToRef.get(toTypedQualifiedName(typeName, refQualifiedName)));
+            return new Tuple<>(typedQualifiedName, 
typedQualifiedNameToRef.get(typedQualifiedName));
         }).filter(Objects::nonNull).filter(tuple -> tuple.getValue() != null)
                 // If duplication happens, use new value.
                 .collect(toMap(Tuple::getKey, Tuple::getValue, (oldValue, 
newValue) -> {
@@ -381,13 +382,12 @@ class NotificationSender {
             final Referenceable refFromCacheIfAvailable = 
typedQualifiedNameToRef.computeIfAbsent(typedRefQualifiedName, k -> {
                 if (id.isAssigned()) {
                     // If this referenceable has Guid assigned, then add this 
one to cache.
-                    guidToQualifiedName.put(id._getId(), refQualifiedName);
-                    typedQualifiedNameToRef.put(typedRefQualifiedName, ref);
+                    guidToTypedQualifiedName.put(id._getId(), 
typedRefQualifiedName);
                 }
                 return ref;
             });
 
-            return new Tuple<>(refQualifiedName, refFromCacheIfAvailable);
+            return new Tuple<>(typedRefQualifiedName, refFromCacheIfAvailable);
         }).filter(tuple -> tuple.getValue() != null)
                 .collect(toMap(Tuple::getKey, Tuple::getValue));
     }

Reply via email to