This is an automated email from the ASF dual-hosted git repository.

sarath pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new dae1043  ATLAS-3874: NotificationHookConsumer: Concurrent Message 
Processing
dae1043 is described below

commit dae104355697ae1280fe7dc6b0d9a685a53caee3
Author: Ashutosh Mestry <[email protected]>
AuthorDate: Mon Aug 31 22:54:15 2020 -0700

    ATLAS-3874: NotificationHookConsumer: Concurrent Message Processing
    
    Change-Id: Ia5ccc6aeea2b0c44493636243b7627d16ad32c8f
    Signed-off-by: Sarath Subramanian <[email protected]>
    (cherry picked from commit 3468c2b4ac1175849ce119dcd69f5518b1d9a342)
---
 .../store/graph/EntityGraphDiscoveryContext.java         | 10 ++++++++--
 .../atlas/notification/NotificationHookConsumer.java     | 16 ++++++++++++++++
 2 files changed, 24 insertions(+), 2 deletions(-)

diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/EntityGraphDiscoveryContext.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/EntityGraphDiscoveryContext.java
index 2221ac4..d49bc5e 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/EntityGraphDiscoveryContext.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/EntityGraphDiscoveryContext.java
@@ -96,11 +96,17 @@ public final class EntityGraphDiscoveryContext {
     }
 
     public AtlasVertex getResolvedEntityVertex(AtlasObjectId objId) {
-        if (objId instanceof AtlasRelatedObjectId) {
+        if (resolvedIdsByUniqAttribs.containsKey(objId)) {
+            return getAtlasVertexFromResolvedIdsByAttribs(objId);
+        } else if (objId instanceof AtlasRelatedObjectId) {
             objId = new AtlasObjectId(objId.getGuid(), objId.getTypeName(), 
objId.getUniqueAttributes());
         }
-        AtlasVertex vertex = resolvedIdsByUniqAttribs.get(objId);
 
+        return getAtlasVertexFromResolvedIdsByAttribs(objId);
+    }
+
+    private AtlasVertex getAtlasVertexFromResolvedIdsByAttribs(AtlasObjectId 
objId) {
+        AtlasVertex vertex = resolvedIdsByUniqAttribs.get(objId);
         // check also for sub-types; ref={typeName=Asset; guid=abcd} should 
match {typeName=hive_table; guid=abcd}
         if (vertex == null) {
             final AtlasEntityType entityType  = 
typeRegistry.getEntityTypeByName(objId.getTypeName());
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 3f1ea05..f02c05f 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -117,6 +117,8 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
     private static final String TYPE_HIVE_COLUMN_LINEAGE = 
"hive_column_lineage";
     private static final String ATTRIBUTE_INPUTS         = "inputs";
     private static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
+    private static final String EXCEPTION_CLASS_NAME_JANUSGRAPH_EXCEPTION = 
"JanusGraphException";
+    private static final String 
EXCEPTION_CLASS_NAME_PERMANENTLOCKING_EXCEPTION = "PermanentLockingException";
 
     // from org.apache.hadoop.hive.ql.parse.SemanticAnalyzer
     public static final String DUMMY_DATABASE               = 
"_dummy_database";
@@ -624,6 +626,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
                 }
 
                 // Used for intermediate conversions during create and update
+                String exceptionClassName = StringUtils.EMPTY;
                 for (int numRetries = 0; numRetries < maxRetries; 
numRetries++) {
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("handleMessage({}): attempt {}", 
message.getType().name(), numRetries);
@@ -783,9 +786,15 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
                                 throw new IllegalStateException("Unknown 
notification type: " + message.getType().name());
                         }
 
+                        if (StringUtils.isNotEmpty(exceptionClassName)) {
+                            LOG.warn("{}: Pausing & retry: Try: {}: Pause: {} 
ms. Handled!",
+                                                exceptionClassName, 
numRetries, adaptiveWaiter.waitDuration);
+                            exceptionClassName = StringUtils.EMPTY;
+                        }
                         break;
                     } catch (Throwable e) {
                         RequestContext.get().resetEntityGuidUpdates();
+                        exceptionClassName = e.getClass().getSimpleName();
 
                         if (numRetries == (maxRetries - 1)) {
                             String strMessage = 
AbstractNotification.getMessageJson(message);
@@ -800,6 +809,13 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
                                 recordFailedMessages();
                             }
                             return;
+                        } else if (e instanceof 
org.apache.atlas.repository.graphdb.AtlasSchemaViolationException
+                                        || 
exceptionClassName.equals(EXCEPTION_CLASS_NAME_JANUSGRAPH_EXCEPTION)
+                                        || 
exceptionClassName.equals(EXCEPTION_CLASS_NAME_PERMANENTLOCKING_EXCEPTION)) {
+                            LOG.warn("{}: Pausing & retry: Try: {}: Pause: {} 
ms. {}",
+                                    exceptionClassName, numRetries, 
adaptiveWaiter.waitDuration, e.getMessage());
+
+                            adaptiveWaiter.pause((Exception) e);
                         } else {
                             LOG.warn("Error handling message", e);
 

Reply via email to