This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch feature/GEODE-5379
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/GEODE-5379 by this 
push:
     new d7a6988  GEODE-5379: reset affinityRetryCount to zero only when 
initial retry count is zero
d7a6988 is described below

commit d7a698851e369e12cfeb0065a1bb938eccc8f7ae
Author: eshu <[email protected]>
AuthorDate: Tue Jul 10 15:16:48 2018 -0700

    GEODE-5379: reset affinityRetryCount to zero only when initial retry count 
is zero
---
 .../cache/client/internal/OpExecutorImpl.java      | 96 ++++++++++++----------
 .../client/internal/OpExecutorImplJUnitTest.java   | 76 ++++++++++++++---
 2 files changed, 114 insertions(+), 58 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java
 
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java
index d8980a6..70ffea5 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java
@@ -75,7 +75,7 @@ public class OpExecutorImpl implements ExecutablePool {
 
   private static final boolean TRY_SERVERS_ONCE =
       Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + 
"PoolImpl.TRY_SERVERS_ONCE");
-  private static final int TX_RETRY_ATTEMPT =
+  static final int TX_RETRY_ATTEMPT =
       Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "txRetryAttempt", 
500);
 
   private final ConnectionManager connectionManager;
@@ -102,6 +102,7 @@ public class OpExecutorImpl implements ExecutablePool {
   private boolean serverAffinityFailover = false;
   private final ThreadLocal<ServerLocation> affinityServerLocation =
       new ThreadLocal<ServerLocation>();
+
   private final ThreadLocal<Integer> affinityRetryCount = new 
ThreadLocal<Integer>() {
     protected Integer initialValue() {
       return 0;
@@ -226,52 +227,54 @@ public class OpExecutorImpl implements ExecutablePool {
    * @param op the op to execute
    * @return the result of execution
    */
-  private Object executeWithServerAffinity(ServerLocation loc, Op op) {
+  Object executeWithServerAffinity(ServerLocation loc, Op op) {
+    final int initialRetryCount = getAffinityRetryCount();
     try {
-      Object retVal = executeOnServer(loc, op, true, false);
-      return retVal;
-    } catch (ServerConnectivityException e) {
-      if (logger.isDebugEnabled()) {
-        logger.debug("caught exception while executing with affinity:{}", 
e.getMessage(), e);
-      }
-      if (!this.serverAffinityFailover || e instanceof 
ServerOperationException) {
-        affinityRetryCount.set(0);
-        throw e;
+      try {
+        Object retVal = executeOnServer(loc, op, true, false);
+        return retVal;
+      } catch (ServerConnectivityException e) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("caught exception while executing with affinity:{}", 
e.getMessage(), e);
+        }
+        if (!this.serverAffinityFailover || e instanceof 
ServerOperationException) {
+          throw e;
+        }
+        int retryCount = getAffinityRetryCount();
+        if ((retryAttempts != -1 && retryCount >= retryAttempts)
+            || retryCount > TX_RETRY_ATTEMPT) {
+          // prevent stack overflow
+          throw e;
+        }
+        setAffinityRetryCount(retryCount + 1);
       }
-      int retryCount = affinityRetryCount.get();
-      if ((retryAttempts != -1 && retryCount >= retryAttempts) || retryCount > 
TX_RETRY_ATTEMPT) { // prevent
-                                                                               
                    // stack
-                                                                               
                    // overflow
-                                                                               
                    // fixes
-                                                                               
                    // bug
-                                                                               
                    // 46535
-        affinityRetryCount.set(0);
-        throw e;
+      this.affinityServerLocation.set(null);
+      if (logger.isDebugEnabled()) {
+        logger.debug("reset server affinity: attempting txFailover");
+      }
+      // send TXFailoverOp, so that new server can
+      // do bootstrapping, then re-execute original op
+      AbstractOp absOp = (AbstractOp) op;
+      absOp.getMessage().setIsRetry();
+      int transactionId = absOp.getMessage().getTransactionId();
+      // for CommitOp we do not have transactionId in AbstractOp
+      // so set it explicitly for TXFailoverOp
+      TXFailoverOp.execute(this.pool, transactionId);
+
+      if (op instanceof ExecuteRegionFunctionOpImpl) {
+        op = new ExecuteRegionFunctionOpImpl((ExecuteRegionFunctionOpImpl) op,
+            (byte) 1/* isReExecute */, new HashSet<String>());
+        ((ExecuteRegionFunctionOpImpl) 
op).getMessage().setTransactionId(transactionId);
+      } else if (op instanceof ExecuteFunctionOpImpl) {
+        op = new ExecuteFunctionOpImpl((ExecuteFunctionOpImpl) op, (byte) 1/* 
isReExecute */);
+        ((ExecuteFunctionOpImpl) 
op).getMessage().setTransactionId(transactionId);
+      }
+      return this.pool.execute(op);
+    } finally {
+      if (initialRetryCount == 0) {
+        setAffinityRetryCount(0);
       }
-      affinityRetryCount.set(retryCount + 1);
-    }
-    this.affinityServerLocation.set(null);
-    if (logger.isDebugEnabled()) {
-      logger.debug("reset server affinity: attempting txFailover");
     }
-    // send TXFailoverOp, so that new server can
-    // do bootstrapping, then re-execute original op
-    AbstractOp absOp = (AbstractOp) op;
-    absOp.getMessage().setIsRetry();
-    int transactionId = absOp.getMessage().getTransactionId();
-    // for CommitOp we do not have transactionId in AbstractOp
-    // so set it explicitly for TXFailoverOp
-    TXFailoverOp.execute(this.pool, transactionId);
-
-    if (op instanceof ExecuteRegionFunctionOpImpl) {
-      op = new ExecuteRegionFunctionOpImpl((ExecuteRegionFunctionOpImpl) op,
-          (byte) 1/* isReExecute */, new HashSet<String>());
-      ((ExecuteRegionFunctionOpImpl) 
op).getMessage().setTransactionId(transactionId);
-    } else if (op instanceof ExecuteFunctionOpImpl) {
-      op = new ExecuteFunctionOpImpl((ExecuteFunctionOpImpl) op, (byte) 1/* 
isReExecute */);
-      ((ExecuteFunctionOpImpl) 
op).getMessage().setTransactionId(transactionId);
-    }
-    return this.pool.execute(op);
   }
 
   public void setupServerAffinity(boolean allowFailover) {
@@ -294,11 +297,14 @@ public class OpExecutorImpl implements ExecutablePool {
     return this.affinityServerLocation.get();
   }
 
-  // for test only
-  protected int getAffinityRetryCount() {
+  int getAffinityRetryCount() {
     return affinityRetryCount.get();
   }
 
+  void setAffinityRetryCount(int retryCount) {
+    affinityRetryCount.set(retryCount);
+  }
+
   public void setServerAffinityLocation(ServerLocation serverLocation) {
     assert this.affinityServerLocation.get() == null;
     this.affinityServerLocation.set(serverLocation);
diff --git 
a/geode-core/src/test/java/org/apache/geode/cache/client/internal/OpExecutorImplJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/cache/client/internal/OpExecutorImplJUnitTest.java
index 3af8332..3b9afd7 100644
--- 
a/geode-core/src/test/java/org/apache/geode/cache/client/internal/OpExecutorImplJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/cache/client/internal/OpExecutorImplJUnitTest.java
@@ -14,12 +14,15 @@
  */
 package org.apache.geode.cache.client.internal;
 
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.fail;
-import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
@@ -465,31 +468,78 @@ public class OpExecutorImplJUnitTest {
   }
 
   @Test
-  public void executeWithServerAffinityDoesNotResetAffinityRetryCount() {
+  public void executeWithServerAffinityDoesNotChangeInitialRetryCountOfZero() {
+    OpExecutorImpl opExecutor =
+        new OpExecutorImpl(manager, queueManager, endpointManager, riTracker, 
-1,
+            10, true, cancelCriterion, mock(PoolImpl.class));
+    Op txSynchronizationOp = mock(TXSynchronizationOp.Impl.class);
+    ServerLocation serverLocation = mock(ServerLocation.class);
+    opExecutor.setAffinityRetryCount(0);
+
+    opExecutor.executeWithServerAffinity(serverLocation, txSynchronizationOp);
+
+    assertEquals(0, opExecutor.getAffinityRetryCount());
+  }
+
+  @Test
+  public void 
executeWithServerAffinityWithNonZeroAffinityRetryCountWillNotSetToZero() {
+    OpExecutorImpl opExecutor =
+        new OpExecutorImpl(manager, queueManager, endpointManager, riTracker, 
-1,
+            10, true, cancelCriterion, mock(PoolImpl.class));
+
+    Op txSynchronizationOp = mock(TXSynchronizationOp.Impl.class);
+    ServerLocation serverLocation = mock(ServerLocation.class);
+    opExecutor.setAffinityRetryCount(1);
+
+    opExecutor.executeWithServerAffinity(serverLocation, txSynchronizationOp);
+
+    assertNotEquals(0, opExecutor.getAffinityRetryCount());
+  }
+
+  @Test
+  public void 
executeWithServerAffinityWithServerConnectivityExceptionIncrementsRetryCountAndResetsToZero()
 {
     OpExecutorImpl opExecutor =
         spy(new OpExecutorImpl(manager, queueManager, endpointManager, 
riTracker, -1,
             10, true, cancelCriterion, mock(PoolImpl.class)));
+
+    Op txSynchronizationOp = mock(TXSynchronizationOp.Impl.class);
+    ServerLocation serverLocation = mock(ServerLocation.class);
+    ServerConnectivityException serverConnectivityException = new 
ServerConnectivityException();
+
+    
doThrow(serverConnectivityException).when(opExecutor).executeOnServer(serverLocation,
+        txSynchronizationOp, true, false);
     opExecutor.setupServerAffinity(true);
+    when(((AbstractOp) 
txSynchronizationOp).getMessage()).thenReturn(mock(Message.class));
+    opExecutor.setAffinityRetryCount(0);
+
+    opExecutor.executeWithServerAffinity(serverLocation, txSynchronizationOp);
+
+    verify(opExecutor, times(1)).setAffinityRetryCount(1);
+    assertEquals(0, opExecutor.getAffinityRetryCount());
+  }
+
+  @Test
+  public void 
executeWithServerAffinityAndRetryCountGreaterThansTxRetryAttemptThrowsServerConnectivityException()
 {
+    OpExecutorImpl opExecutor =
+        spy(new OpExecutorImpl(manager, queueManager, endpointManager, 
riTracker, -1,
+            10, true, cancelCriterion, mock(PoolImpl.class)));
 
     Op txSynchronizationOp = mock(TXSynchronizationOp.Impl.class);
     ServerLocation serverLocation = mock(ServerLocation.class);
-    ServerConnectivityException serverConnectivityException =
-        mock(ServerConnectivityException.class);
+    ServerConnectivityException serverConnectivityException = new 
ServerConnectivityException();
 
-    doReturn(serverLocation).when(opExecutor).getNextOpServerLocation();
     
doThrow(serverConnectivityException).when(opExecutor).executeOnServer(serverLocation,
         txSynchronizationOp, true, false);
+    opExecutor.setupServerAffinity(true);
     when(((AbstractOp) 
txSynchronizationOp).getMessage()).thenReturn(mock(Message.class));
-    opExecutor.execute(txSynchronizationOp);
-    assertEquals(1, opExecutor.getAffinityRetryCount());
-
-    Op txFailoverOp = mock(TXFailoverOp.TXFailoverOpImpl.class);
-    doReturn(new Object()).when(opExecutor).executeOnServer(serverLocation, 
txFailoverOp, true,
-        false);
-    opExecutor.execute(txFailoverOp);
-    assertEquals(1, opExecutor.getAffinityRetryCount());
+    opExecutor.setAffinityRetryCount(opExecutor.TX_RETRY_ATTEMPT + 1);
+
+    assertThatThrownBy(
+        () -> opExecutor.executeWithServerAffinity(serverLocation, 
txSynchronizationOp))
+            .isSameAs(serverConnectivityException);
   }
 
+
   private class DummyManager implements ConnectionManager {
 
     protected int numServers = Integer.MAX_VALUE;

Reply via email to