This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 599fb98 NIFI-6937 - NotificationSender uses typedQualifiedName
instead of simple qualifiedName as keys in local maps.
599fb98 is described below
commit 599fb9841579e83f37cb0902ff2124ffda468b98
Author: Tamas Palfy <[email protected]>
AuthorDate: Thu Dec 12 14:46:31 2019 +0100
NIFI-6937 - NotificationSender uses typedQualifiedName instead of simple
qualifiedName as keys in local maps.
NIFI-6937 - Fix NotificationSender: typedQualifiedName handling.
This closes #3929.
Signed-off-by: Peter Turcsanyi <[email protected]>
---
.../apache/nifi/atlas/hook/NotificationSender.java | 26 +++++++++++-----------
1 file changed, 13 insertions(+), 13 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NotificationSender.java
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NotificationSender.java
index c06e254..66c21df 100644
---
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NotificationSender.java
+++
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NotificationSender.java
@@ -70,7 +70,7 @@ class NotificationSender {
/**
* An index to resolve a qualifiedName from a GUID.
*/
- private final Map<String, String> guidToQualifiedName;
+ private final Map<String, String> guidToTypedQualifiedName;
/**
* An index to resolve a Referenceable from a typeName::qualifiedName.
*/
@@ -87,7 +87,7 @@ class NotificationSender {
NotificationSender() {
final int qualifiedNameCacheSize = 10_000;
- this.guidToQualifiedName = createCache(qualifiedNameCacheSize);
+ this.guidToTypedQualifiedName = createCache(qualifiedNameCacheSize);
final int dataSetRefCacheSize = 1_000;
this.typedQualifiedNameToRef = createCache(dataSetRefCacheSize);
@@ -129,7 +129,7 @@ class NotificationSender {
createMessages, uniqueFlowPathCreates, uniqueOtherCreates,
partialNiFiFlowPathUpdates,
uniquePartialNiFiFlowPathUpdates, otherMessages,
flowPathSearched, dataSetSearched, dataSetCacheHit,
- guidToQualifiedName.size(),
typedQualifiedNameToRef.size());
+ guidToTypedQualifiedName.size(),
typedQualifiedNameToRef.size());
}
}
@@ -309,7 +309,7 @@ class NotificationSender {
}
/**
- * <p>Convert nifi_flow_path inputs or outputs to a map of Referenceable
keyed by qualifiedName.</p>
+ * <p>Convert nifi_flow_path inputs or outputs to a map of Referenceable
keyed by type + qualifiedName.</p>
* <p>Atlas removes existing references those are not specified when a
collection attribute is updated.
* In order to preserve existing DataSet references, existing elements
should be passed within a partial update message.</p>
* <p>This method also populates entity cache for subsequent lookups.</p>
@@ -329,17 +329,18 @@ class NotificationSender {
final String typeName = (String) ref.get(ATTR_TYPENAME);
final String guid = (String) ref.get(ATTR_GUID);
- if (guidToQualifiedName.containsKey(guid)) {
+ if (guidToTypedQualifiedName.containsKey(guid)) {
metrics.dataSetCacheHit++;
}
- final String refQualifiedName =
guidToQualifiedName.computeIfAbsent(guid, k -> {
+ final String typedQualifiedName =
guidToTypedQualifiedName.computeIfAbsent(guid, k -> {
try {
metrics.dataSetSearched++;
final AtlasEntity.AtlasEntityWithExtInfo refExt =
atlasClient.searchEntityDef(new AtlasObjectId(guid, typeName));
final String qualifiedName = (String)
refExt.getEntity().getAttribute(ATTR_QUALIFIED_NAME);
- typedQualifiedNameToRef.put(toTypedQualifiedName(typeName,
qualifiedName), new Referenceable(guid, typeName, Collections.EMPTY_MAP));
- return qualifiedName;
+ String _typedQualifiedName =
toTypedQualifiedName(typeName, qualifiedName);
+ typedQualifiedNameToRef.put(_typedQualifiedName, new
Referenceable(guid, typeName, Collections.EMPTY_MAP));
+ return _typedQualifiedName;
} catch (AtlasServiceException e) {
if (ClientResponse.Status.NOT_FOUND.equals(e.getStatus()))
{
logger.warn("{} entity was not found for guid {}",
typeName, guid);
@@ -350,10 +351,10 @@ class NotificationSender {
}
});
- if (refQualifiedName == null) {
+ if (typedQualifiedName == null) {
return null;
}
- return new Tuple<>(refQualifiedName,
typedQualifiedNameToRef.get(toTypedQualifiedName(typeName, refQualifiedName)));
+ return new Tuple<>(typedQualifiedName,
typedQualifiedNameToRef.get(typedQualifiedName));
}).filter(Objects::nonNull).filter(tuple -> tuple.getValue() != null)
// If duplication happens, use new value.
.collect(toMap(Tuple::getKey, Tuple::getValue, (oldValue,
newValue) -> {
@@ -381,13 +382,12 @@ class NotificationSender {
final Referenceable refFromCacheIfAvailable =
typedQualifiedNameToRef.computeIfAbsent(typedRefQualifiedName, k -> {
if (id.isAssigned()) {
// If this referenceable has Guid assigned, then add this
one to cache.
- guidToQualifiedName.put(id._getId(), refQualifiedName);
- typedQualifiedNameToRef.put(typedRefQualifiedName, ref);
+ guidToTypedQualifiedName.put(id._getId(),
typedRefQualifiedName);
}
return ref;
});
- return new Tuple<>(refQualifiedName, refFromCacheIfAvailable);
+ return new Tuple<>(typedRefQualifiedName, refFromCacheIfAvailable);
}).filter(tuple -> tuple.getValue() != null)
.collect(toMap(Tuple::getKey, Tuple::getValue));
}