This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new b4796e6  GEODE-5379: Reset affinityRetryCount to zero only when 
initial retry count is zero to avoid stack overflow (#2098)
b4796e6 is described below

commit b4796e6d6667ba2815871ea7f81d48fb92af87b1
Author: pivotal-eshu <e...@pivotal.io>
AuthorDate: Wed Jul 11 09:13:21 2018 -0700

    GEODE-5379: Reset affinityRetryCount to zero only when initial retry count 
is zero to avoid stack overflow (#2098)
---
 .../cache/client/internal/OpExecutorImpl.java      | 97 ++++++++++++----------
 .../geode/cache/client/internal/TXFailoverOp.java  |  2 +-
 .../client/internal/OpExecutorImplJUnitTest.java   | 82 ++++++++++++++++++
 3 files changed, 136 insertions(+), 45 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java
 
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java
index 4116dde..7ae4745 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java
@@ -75,7 +75,7 @@ public class OpExecutorImpl implements ExecutablePool {
 
   private static final boolean TRY_SERVERS_ONCE =
       Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + 
"PoolImpl.TRY_SERVERS_ONCE");
-  private static final int TX_RETRY_ATTEMPT =
+  static final int TX_RETRY_ATTEMPT =
       Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "txRetryAttempt", 
500);
 
   private final ConnectionManager connectionManager;
@@ -102,6 +102,7 @@ public class OpExecutorImpl implements ExecutablePool {
   private boolean serverAffinityFailover = false;
   private final ThreadLocal<ServerLocation> affinityServerLocation =
       new ThreadLocal<ServerLocation>();
+
   private final ThreadLocal<Integer> affinityRetryCount = new 
ThreadLocal<Integer>() {
     protected Integer initialValue() {
       return 0;
@@ -226,53 +227,53 @@ public class OpExecutorImpl implements ExecutablePool {
    * @param op the op to execute
    * @return the result of execution
    */
-  private Object executeWithServerAffinity(ServerLocation loc, Op op) {
+  Object executeWithServerAffinity(ServerLocation loc, Op op) {
+    final int initialRetryCount = getAffinityRetryCount();
     try {
-      Object retVal = executeOnServer(loc, op, true, false);
-      affinityRetryCount.set(0);
-      return retVal;
-    } catch (ServerConnectivityException e) {
-      if (logger.isDebugEnabled()) {
-        logger.debug("caught exception while executing with affinity:{}", 
e.getMessage(), e);
-      }
-      if (!this.serverAffinityFailover || e instanceof 
ServerOperationException) {
-        affinityRetryCount.set(0);
-        throw e;
+      try {
+        return executeOnServer(loc, op, true, false);
+      } catch (ServerConnectivityException e) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("caught exception while executing with affinity:{}", 
e.getMessage(), e);
+        }
+        if (!this.serverAffinityFailover || e instanceof 
ServerOperationException) {
+          throw e;
+        }
+        int retryCount = getAffinityRetryCount();
+        if ((retryAttempts != -1 && retryCount >= retryAttempts)
+            || retryCount > TX_RETRY_ATTEMPT) {
+          // prevent stack overflow
+          throw e;
+        }
+        setAffinityRetryCount(retryCount + 1);
       }
-      int retryCount = affinityRetryCount.get();
-      if ((retryAttempts != -1 && retryCount >= retryAttempts) || retryCount > 
TX_RETRY_ATTEMPT) { // prevent
-                                                                               
                    // stack
-                                                                               
                    // overflow
-                                                                               
                    // fixes
-                                                                               
                    // bug
-                                                                               
                    // 46535
-        affinityRetryCount.set(0);
-        throw e;
+      this.affinityServerLocation.set(null);
+      if (logger.isDebugEnabled()) {
+        logger.debug("reset server affinity: attempting txFailover");
+      }
+      // send TXFailoverOp, so that new server can
+      // do bootstrapping, then re-execute original op
+      AbstractOp absOp = (AbstractOp) op;
+      absOp.getMessage().setIsRetry();
+      int transactionId = absOp.getMessage().getTransactionId();
+      // for CommitOp we do not have transactionId in AbstractOp
+      // so set it explicitly for TXFailoverOp
+      TXFailoverOp.execute(this.pool, transactionId);
+
+      if (op instanceof ExecuteRegionFunctionOpImpl) {
+        op = new ExecuteRegionFunctionOpImpl((ExecuteRegionFunctionOpImpl) op,
+            (byte) 1/* isReExecute */, new HashSet<String>());
+        ((ExecuteRegionFunctionOpImpl) 
op).getMessage().setTransactionId(transactionId);
+      } else if (op instanceof ExecuteFunctionOpImpl) {
+        op = new ExecuteFunctionOpImpl((ExecuteFunctionOpImpl) op, (byte) 1/* 
isReExecute */);
+        ((ExecuteFunctionOpImpl) 
op).getMessage().setTransactionId(transactionId);
+      }
+      return this.pool.execute(op);
+    } finally {
+      if (initialRetryCount == 0) {
+        setAffinityRetryCount(0);
       }
-      affinityRetryCount.set(retryCount + 1);
     }
-    this.affinityServerLocation.set(null);
-    if (logger.isDebugEnabled()) {
-      logger.debug("reset server affinity: attempting txFailover");
-    }
-    // send TXFailoverOp, so that new server can
-    // do bootstrapping, then re-execute original op
-    AbstractOp absOp = (AbstractOp) op;
-    absOp.getMessage().setIsRetry();
-    int transactionId = absOp.getMessage().getTransactionId();
-    // for CommitOp we do not have transactionId in AbstractOp
-    // so set it explicitly for TXFailoverOp
-    TXFailoverOp.execute(this.pool, transactionId);
-
-    if (op instanceof ExecuteRegionFunctionOpImpl) {
-      op = new ExecuteRegionFunctionOpImpl((ExecuteRegionFunctionOpImpl) op,
-          (byte) 1/* isReExecute */, new HashSet<String>());
-      ((ExecuteRegionFunctionOpImpl) 
op).getMessage().setTransactionId(transactionId);
-    } else if (op instanceof ExecuteFunctionOpImpl) {
-      op = new ExecuteFunctionOpImpl((ExecuteFunctionOpImpl) op, (byte) 1/* 
isReExecute */);
-      ((ExecuteFunctionOpImpl) 
op).getMessage().setTransactionId(transactionId);
-    }
-    return this.pool.execute(op);
   }
 
   public void setupServerAffinity(boolean allowFailover) {
@@ -295,6 +296,14 @@ public class OpExecutorImpl implements ExecutablePool {
     return this.affinityServerLocation.get();
   }
 
+  int getAffinityRetryCount() {
+    return affinityRetryCount.get();
+  }
+
+  void setAffinityRetryCount(int retryCount) {
+    affinityRetryCount.set(retryCount);
+  }
+
   public void setServerAffinityLocation(ServerLocation serverLocation) {
     assert this.affinityServerLocation.get() == null;
     this.affinityServerLocation.set(serverLocation);
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/TXFailoverOp.java
 
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/TXFailoverOp.java
index 90714c1..5786999 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/TXFailoverOp.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/TXFailoverOp.java
@@ -33,7 +33,7 @@ public class TXFailoverOp {
     // no instance
   }
 
-  private static class TXFailoverOpImpl extends AbstractOp {
+  protected static class TXFailoverOpImpl extends AbstractOp {
     int txId;
 
     protected TXFailoverOpImpl(int txId) {
diff --git 
a/geode-core/src/test/java/org/apache/geode/cache/client/internal/OpExecutorImplJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/cache/client/internal/OpExecutorImplJUnitTest.java
index 5478292..3b9afd7 100644
--- 
a/geode-core/src/test/java/org/apache/geode/cache/client/internal/OpExecutorImplJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/cache/client/internal/OpExecutorImplJUnitTest.java
@@ -14,8 +14,16 @@
  */
 package org.apache.geode.cache.client.internal;
 
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -42,6 +50,7 @@ import 
org.apache.geode.cache.client.internal.pooling.ConnectionManager;
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.ServerLocation;
+import org.apache.geode.internal.cache.tier.sockets.Message;
 import org.apache.geode.internal.cache.tier.sockets.ServerQueueStatus;
 import org.apache.geode.internal.logging.InternalLogWriter;
 import org.apache.geode.internal.logging.LocalLogWriter;
@@ -458,6 +467,79 @@ public class OpExecutorImplJUnitTest {
     assertEquals(0, returns);
   }
 
+  @Test
+  public void executeWithServerAffinityDoesNotChangeInitialRetryCountOfZero() {
+    OpExecutorImpl opExecutor =
+        new OpExecutorImpl(manager, queueManager, endpointManager, riTracker, 
-1,
+            10, true, cancelCriterion, mock(PoolImpl.class));
+    Op txSynchronizationOp = mock(TXSynchronizationOp.Impl.class);
+    ServerLocation serverLocation = mock(ServerLocation.class);
+    opExecutor.setAffinityRetryCount(0);
+
+    opExecutor.executeWithServerAffinity(serverLocation, txSynchronizationOp);
+
+    assertEquals(0, opExecutor.getAffinityRetryCount());
+  }
+
+  @Test
+  public void 
executeWithServerAffinityWithNonZeroAffinityRetryCountWillNotSetToZero() {
+    OpExecutorImpl opExecutor =
+        new OpExecutorImpl(manager, queueManager, endpointManager, riTracker, 
-1,
+            10, true, cancelCriterion, mock(PoolImpl.class));
+
+    Op txSynchronizationOp = mock(TXSynchronizationOp.Impl.class);
+    ServerLocation serverLocation = mock(ServerLocation.class);
+    opExecutor.setAffinityRetryCount(1);
+
+    opExecutor.executeWithServerAffinity(serverLocation, txSynchronizationOp);
+
+    assertNotEquals(0, opExecutor.getAffinityRetryCount());
+  }
+
+  @Test
+  public void 
executeWithServerAffinityWithServerConnectivityExceptionIncrementsRetryCountAndResetsToZero()
 {
+    OpExecutorImpl opExecutor =
+        spy(new OpExecutorImpl(manager, queueManager, endpointManager, 
riTracker, -1,
+            10, true, cancelCriterion, mock(PoolImpl.class)));
+
+    Op txSynchronizationOp = mock(TXSynchronizationOp.Impl.class);
+    ServerLocation serverLocation = mock(ServerLocation.class);
+    ServerConnectivityException serverConnectivityException = new 
ServerConnectivityException();
+
+    
doThrow(serverConnectivityException).when(opExecutor).executeOnServer(serverLocation,
+        txSynchronizationOp, true, false);
+    opExecutor.setupServerAffinity(true);
+    when(((AbstractOp) 
txSynchronizationOp).getMessage()).thenReturn(mock(Message.class));
+    opExecutor.setAffinityRetryCount(0);
+
+    opExecutor.executeWithServerAffinity(serverLocation, txSynchronizationOp);
+
+    verify(opExecutor, times(1)).setAffinityRetryCount(1);
+    assertEquals(0, opExecutor.getAffinityRetryCount());
+  }
+
+  @Test
+  public void 
executeWithServerAffinityAndRetryCountGreaterThansTxRetryAttemptThrowsServerConnectivityException()
 {
+    OpExecutorImpl opExecutor =
+        spy(new OpExecutorImpl(manager, queueManager, endpointManager, 
riTracker, -1,
+            10, true, cancelCriterion, mock(PoolImpl.class)));
+
+    Op txSynchronizationOp = mock(TXSynchronizationOp.Impl.class);
+    ServerLocation serverLocation = mock(ServerLocation.class);
+    ServerConnectivityException serverConnectivityException = new 
ServerConnectivityException();
+
+    
doThrow(serverConnectivityException).when(opExecutor).executeOnServer(serverLocation,
+        txSynchronizationOp, true, false);
+    opExecutor.setupServerAffinity(true);
+    when(((AbstractOp) 
txSynchronizationOp).getMessage()).thenReturn(mock(Message.class));
+    opExecutor.setAffinityRetryCount(opExecutor.TX_RETRY_ATTEMPT + 1);
+
+    assertThatThrownBy(
+        () -> opExecutor.executeWithServerAffinity(serverLocation, 
txSynchronizationOp))
+            .isSameAs(serverConnectivityException);
+  }
+
+
   private class DummyManager implements ConnectionManager {
 
     protected int numServers = Integer.MAX_VALUE;

Reply via email to