This is an automated email from the ASF dual-hosted git repository.
sarath pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new dae1043 ATLAS-3874: NotificationHookConsumer: Concurrent Message
Processing
dae1043 is described below
commit dae104355697ae1280fe7dc6b0d9a685a53caee3
Author: Ashutosh Mestry <[email protected]>
AuthorDate: Mon Aug 31 22:54:15 2020 -0700
ATLAS-3874: NotificationHookConsumer: Concurrent Message Processing
Change-Id: Ia5ccc6aeea2b0c44493636243b7627d16ad32c8f
Signed-off-by: Sarath Subramanian <[email protected]>
(cherry picked from commit 3468c2b4ac1175849ce119dcd69f5518b1d9a342)
---
.../store/graph/EntityGraphDiscoveryContext.java | 10 ++++++++--
.../atlas/notification/NotificationHookConsumer.java | 16 ++++++++++++++++
2 files changed, 24 insertions(+), 2 deletions(-)
diff --git
a/repository/src/main/java/org/apache/atlas/repository/store/graph/EntityGraphDiscoveryContext.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/EntityGraphDiscoveryContext.java
index 2221ac4..d49bc5e 100644
---
a/repository/src/main/java/org/apache/atlas/repository/store/graph/EntityGraphDiscoveryContext.java
+++
b/repository/src/main/java/org/apache/atlas/repository/store/graph/EntityGraphDiscoveryContext.java
@@ -96,11 +96,17 @@ public final class EntityGraphDiscoveryContext {
}
public AtlasVertex getResolvedEntityVertex(AtlasObjectId objId) {
- if (objId instanceof AtlasRelatedObjectId) {
+ if (resolvedIdsByUniqAttribs.containsKey(objId)) {
+ return getAtlasVertexFromResolvedIdsByAttribs(objId);
+ } else if (objId instanceof AtlasRelatedObjectId) {
objId = new AtlasObjectId(objId.getGuid(), objId.getTypeName(),
objId.getUniqueAttributes());
}
- AtlasVertex vertex = resolvedIdsByUniqAttribs.get(objId);
+ return getAtlasVertexFromResolvedIdsByAttribs(objId);
+ }
+
+ private AtlasVertex getAtlasVertexFromResolvedIdsByAttribs(AtlasObjectId
objId) {
+ AtlasVertex vertex = resolvedIdsByUniqAttribs.get(objId);
// check also for sub-types; ref={typeName=Asset; guid=abcd} should
match {typeName=hive_table; guid=abcd}
if (vertex == null) {
final AtlasEntityType entityType =
typeRegistry.getEntityTypeByName(objId.getTypeName());
diff --git
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 3f1ea05..f02c05f 100644
---
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -117,6 +117,8 @@ public class NotificationHookConsumer implements Service,
ActiveStateChangeHandl
private static final String TYPE_HIVE_COLUMN_LINEAGE =
"hive_column_lineage";
private static final String ATTRIBUTE_INPUTS = "inputs";
private static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
+ private static final String EXCEPTION_CLASS_NAME_JANUSGRAPH_EXCEPTION =
"JanusGraphException";
+ private static final String
EXCEPTION_CLASS_NAME_PERMANENTLOCKING_EXCEPTION = "PermanentLockingException";
// from org.apache.hadoop.hive.ql.parse.SemanticAnalyzer
public static final String DUMMY_DATABASE =
"_dummy_database";
@@ -624,6 +626,7 @@ public class NotificationHookConsumer implements Service,
ActiveStateChangeHandl
}
// Used for intermediate conversions during create and update
+ String exceptionClassName = StringUtils.EMPTY;
for (int numRetries = 0; numRetries < maxRetries;
numRetries++) {
if (LOG.isDebugEnabled()) {
LOG.debug("handleMessage({}): attempt {}",
message.getType().name(), numRetries);
@@ -783,9 +786,15 @@ public class NotificationHookConsumer implements Service,
ActiveStateChangeHandl
throw new IllegalStateException("Unknown
notification type: " + message.getType().name());
}
+ if (StringUtils.isNotEmpty(exceptionClassName)) {
+ LOG.warn("{}: Pausing & retry: Try: {}: Pause: {}
ms. Handled!",
+ exceptionClassName,
numRetries, adaptiveWaiter.waitDuration);
+ exceptionClassName = StringUtils.EMPTY;
+ }
break;
} catch (Throwable e) {
RequestContext.get().resetEntityGuidUpdates();
+ exceptionClassName = e.getClass().getSimpleName();
if (numRetries == (maxRetries - 1)) {
String strMessage =
AbstractNotification.getMessageJson(message);
@@ -800,6 +809,13 @@ public class NotificationHookConsumer implements Service,
ActiveStateChangeHandl
recordFailedMessages();
}
return;
+ } else if (e instanceof
org.apache.atlas.repository.graphdb.AtlasSchemaViolationException
+ ||
exceptionClassName.equals(EXCEPTION_CLASS_NAME_JANUSGRAPH_EXCEPTION)
+ ||
exceptionClassName.equals(EXCEPTION_CLASS_NAME_PERMANENTLOCKING_EXCEPTION)) {
+ LOG.warn("{}: Pausing & retry: Try: {}: Pause: {}
ms. {}",
+ exceptionClassName, numRetries,
adaptiveWaiter.waitDuration, e.getMessage());
+
+ adaptiveWaiter.pause((Exception) e);
} else {
LOG.warn("Error handling message", e);