Repository: atlas
Updated Branches:
  refs/heads/master 6e7aa6ed3 -> 479b9ab1c


ATLAS-2667: Enhance GraphTransactionInterceptor to deal with nested/inner 
commits

Change-Id: I9ea29deb9aea226f077f4d008d459fdb3ac6663f


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/479b9ab1
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/479b9ab1
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/479b9ab1

Branch: refs/heads/master
Commit: 479b9ab1c0d461e89d14d2d3426af64f3cb9acb9
Parents: 6e7aa6e
Author: apoorvnaik <apoorvn...@apache.org>
Authored: Tue May 8 15:28:47 2018 -0700
Committer: apoorvnaik <apoorvn...@apache.org>
Committed: Thu May 10 07:58:06 2018 -0700

----------------------------------------------------------------------
 .../atlas/GraphTransactionInterceptor.java      | 102 +++++++++++++++----
 1 file changed, 83 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/479b9ab1/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java 
b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
index b3e690f..4c43677 100644
--- a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
+++ b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
@@ -21,14 +21,15 @@ import com.google.common.annotations.VisibleForTesting;
 import org.aopalliance.intercept.MethodInterceptor;
 import org.aopalliance.intercept.MethodInvocation;
 import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.exception.NotFoundException;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 
 import javax.inject.Inject;
 import javax.ws.rs.core.Response;
+import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -41,8 +42,10 @@ public class GraphTransactionInterceptor implements 
MethodInterceptor {
     private static final Logger LOG = 
LoggerFactory.getLogger(GraphTransactionInterceptor.class);
 
     @VisibleForTesting
-    private static final ObjectUpdateSynchronizer OBJECT_UPDATE_SYNCHRONIZER = 
new ObjectUpdateSynchronizer();
-    private static final ThreadLocal<List<PostTransactionHook>> 
postTransactionHooks = new ThreadLocal<>();
+    private static final ObjectUpdateSynchronizer               
OBJECT_UPDATE_SYNCHRONIZER = new ObjectUpdateSynchronizer();
+    private static final ThreadLocal<List<PostTransactionHook>> 
postTransactionHooks       = new ThreadLocal<>();
+    private static final ThreadLocal<Boolean>                   isTxnOpen      
            = ThreadLocal.withInitial(() -> Boolean.FALSE);
+    private static final ThreadLocal<Boolean>                   innerFailure   
            = ThreadLocal.withInitial(() -> Boolean.FALSE);
 
     private final AtlasGraph graph;
 
@@ -53,39 +56,72 @@ public class GraphTransactionInterceptor implements 
MethodInterceptor {
 
     @Override
     public Object invoke(MethodInvocation invocation) throws Throwable {
+        Method        method            = invocation.getMethod();
+        String        invokingClass     = 
method.getDeclaringClass().getSimpleName();
+        String        invokedMethodName = method.getName();
+
+        boolean isInnerTxn = isTxnOpen.get();
+        // Outermost txn marks any subsequent transaction as inner
+        isTxnOpen.set(Boolean.TRUE);
+
+        if (LOG.isDebugEnabled() && isInnerTxn) {
+            LOG.debug("Txn entry-point {}.{} is inner txn. Commit/Rollback 
will be ignored", invokingClass, invokedMethodName);
+        }
+
         boolean isSuccess = false;
 
         try {
             try {
                 Object response = invocation.proceed();
-                graph.commit();
-                isSuccess = true;
 
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("graph commit");
+                if (isInnerTxn) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Ignoring commit for nested/inner 
transaction {}.{}", invokingClass, invokedMethodName);
+                    }
+                } else {
+                    doCommitOrRollback(invokingClass, invokedMethodName);
                 }
 
+                isSuccess = !innerFailure.get();
+
                 return response;
             } catch (Throwable t) {
-                if (logException(t)) {
-                    LOG.error("graph rollback due to exception ", t);
+                if (isInnerTxn) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Ignoring rollback for nested/inner 
transaction {}.{}", invokingClass, invokedMethodName);
+                    }
+                    innerFailure.set(true);
                 } else {
-                    LOG.error("graph rollback due to exception {}:{}", 
t.getClass().getSimpleName(), t.getMessage());
+                    doRollback(t);
                 }
-                graph.rollback();
                 throw t;
             }
         } finally {
-            List<PostTransactionHook> trxHooks = postTransactionHooks.get();
+            // Only outer txn can mark as closed
+            if (!isInnerTxn) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Closing outer txn");
+                }
+
+                // Reset the boolean flags
+                isTxnOpen.set(Boolean.FALSE);
+                innerFailure.set(Boolean.FALSE);
+
+                List<PostTransactionHook> trxHooks = 
postTransactionHooks.get();
 
-            if (trxHooks != null) {
-                postTransactionHooks.remove();
+                if (trxHooks != null) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Processing post-txn hooks");
+                    }
+
+                    postTransactionHooks.remove();
 
-                for (PostTransactionHook trxHook : trxHooks) {
-                    try {
-                        trxHook.onComplete(isSuccess);
-                    } catch (Throwable t) {
-                        LOG.error("postTransactionHook failed", t);
+                    for (PostTransactionHook trxHook : trxHooks) {
+                        try {
+                            trxHook.onComplete(isSuccess);
+                        } catch (Throwable t) {
+                            LOG.error("postTransactionHook failed", t);
+                        }
                     }
                 }
             }
@@ -94,6 +130,34 @@ public class GraphTransactionInterceptor implements 
MethodInterceptor {
         }
     }
 
+    private void doCommitOrRollback(final String invokingClass, final String 
invokedMethodName) {
+        if (innerFailure.get()) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Inner/Nested call threw exception. Rollback on txn 
entry-point, {}.{}", invokingClass, invokedMethodName);
+            }
+            graph.rollback();
+        } else {
+            doCommit(invokingClass, invokedMethodName);
+        }
+    }
+
+    private void doCommit(final String invokingClass, final String 
invokedMethodName) {
+        graph.commit();
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Graph commit txn {}.{}", invokingClass, 
invokedMethodName);
+        }
+    }
+
+    private void doRollback(final Throwable t) {
+        if (logException(t)) {
+            LOG.error("graph rollback due to exception ", t);
+        } else {
+            LOG.error("graph rollback due to exception {}:{}", 
t.getClass().getSimpleName(), t.getMessage());
+        }
+        graph.rollback();
+    }
+
     public static void lockObjectAndReleasePostCommit(final String guid) {
         OBJECT_UPDATE_SYNCHRONIZER.lockObject(guid);
     }

Reply via email to