This is an automated email from the ASF dual-hosted git repository.
eshu11 pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new b4796e6 GEODE-5379: Reset affinityRetryCount to zero only when
initial retry count is zero to avoid stack overflow (#2098)
b4796e6 is described below
commit b4796e6d6667ba2815871ea7f81d48fb92af87b1
Author: pivotal-eshu <[email protected]>
AuthorDate: Wed Jul 11 09:13:21 2018 -0700
GEODE-5379: Reset affinityRetryCount to zero only when initial retry count
is zero to avoid stack overflow (#2098)
---
.../cache/client/internal/OpExecutorImpl.java | 97 ++++++++++++----------
.../geode/cache/client/internal/TXFailoverOp.java | 2 +-
.../client/internal/OpExecutorImplJUnitTest.java | 82 ++++++++++++++++++
3 files changed, 136 insertions(+), 45 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java
index 4116dde..7ae4745 100644
---
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java
+++
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java
@@ -75,7 +75,7 @@ public class OpExecutorImpl implements ExecutablePool {
private static final boolean TRY_SERVERS_ONCE =
Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX +
"PoolImpl.TRY_SERVERS_ONCE");
- private static final int TX_RETRY_ATTEMPT =
+ static final int TX_RETRY_ATTEMPT =
Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "txRetryAttempt",
500);
private final ConnectionManager connectionManager;
@@ -102,6 +102,7 @@ public class OpExecutorImpl implements ExecutablePool {
private boolean serverAffinityFailover = false;
private final ThreadLocal<ServerLocation> affinityServerLocation =
new ThreadLocal<ServerLocation>();
+
private final ThreadLocal<Integer> affinityRetryCount = new
ThreadLocal<Integer>() {
protected Integer initialValue() {
return 0;
@@ -226,53 +227,53 @@ public class OpExecutorImpl implements ExecutablePool {
* @param op the op to execute
* @return the result of execution
*/
- private Object executeWithServerAffinity(ServerLocation loc, Op op) {
+ Object executeWithServerAffinity(ServerLocation loc, Op op) {
+ final int initialRetryCount = getAffinityRetryCount();
try {
- Object retVal = executeOnServer(loc, op, true, false);
- affinityRetryCount.set(0);
- return retVal;
- } catch (ServerConnectivityException e) {
- if (logger.isDebugEnabled()) {
- logger.debug("caught exception while executing with affinity:{}",
e.getMessage(), e);
- }
- if (!this.serverAffinityFailover || e instanceof
ServerOperationException) {
- affinityRetryCount.set(0);
- throw e;
+ try {
+ return executeOnServer(loc, op, true, false);
+ } catch (ServerConnectivityException e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("caught exception while executing with affinity:{}",
e.getMessage(), e);
+ }
+ if (!this.serverAffinityFailover || e instanceof
ServerOperationException) {
+ throw e;
+ }
+ int retryCount = getAffinityRetryCount();
+ if ((retryAttempts != -1 && retryCount >= retryAttempts)
+ || retryCount > TX_RETRY_ATTEMPT) {
+ // prevent stack overflow
+ throw e;
+ }
+ setAffinityRetryCount(retryCount + 1);
}
- int retryCount = affinityRetryCount.get();
- if ((retryAttempts != -1 && retryCount >= retryAttempts) || retryCount >
TX_RETRY_ATTEMPT) { // prevent
-
// stack
-
// overflow
-
// fixes
-
// bug
-
// 46535
- affinityRetryCount.set(0);
- throw e;
+ this.affinityServerLocation.set(null);
+ if (logger.isDebugEnabled()) {
+ logger.debug("reset server affinity: attempting txFailover");
+ }
+ // send TXFailoverOp, so that new server can
+ // do bootstrapping, then re-execute original op
+ AbstractOp absOp = (AbstractOp) op;
+ absOp.getMessage().setIsRetry();
+ int transactionId = absOp.getMessage().getTransactionId();
+ // for CommitOp we do not have transactionId in AbstractOp
+ // so set it explicitly for TXFailoverOp
+ TXFailoverOp.execute(this.pool, transactionId);
+
+ if (op instanceof ExecuteRegionFunctionOpImpl) {
+ op = new ExecuteRegionFunctionOpImpl((ExecuteRegionFunctionOpImpl) op,
+ (byte) 1/* isReExecute */, new HashSet<String>());
+ ((ExecuteRegionFunctionOpImpl)
op).getMessage().setTransactionId(transactionId);
+ } else if (op instanceof ExecuteFunctionOpImpl) {
+ op = new ExecuteFunctionOpImpl((ExecuteFunctionOpImpl) op, (byte) 1/*
isReExecute */);
+ ((ExecuteFunctionOpImpl)
op).getMessage().setTransactionId(transactionId);
+ }
+ return this.pool.execute(op);
+ } finally {
+ if (initialRetryCount == 0) {
+ setAffinityRetryCount(0);
}
- affinityRetryCount.set(retryCount + 1);
}
- this.affinityServerLocation.set(null);
- if (logger.isDebugEnabled()) {
- logger.debug("reset server affinity: attempting txFailover");
- }
- // send TXFailoverOp, so that new server can
- // do bootstrapping, then re-execute original op
- AbstractOp absOp = (AbstractOp) op;
- absOp.getMessage().setIsRetry();
- int transactionId = absOp.getMessage().getTransactionId();
- // for CommitOp we do not have transactionId in AbstractOp
- // so set it explicitly for TXFailoverOp
- TXFailoverOp.execute(this.pool, transactionId);
-
- if (op instanceof ExecuteRegionFunctionOpImpl) {
- op = new ExecuteRegionFunctionOpImpl((ExecuteRegionFunctionOpImpl) op,
- (byte) 1/* isReExecute */, new HashSet<String>());
- ((ExecuteRegionFunctionOpImpl)
op).getMessage().setTransactionId(transactionId);
- } else if (op instanceof ExecuteFunctionOpImpl) {
- op = new ExecuteFunctionOpImpl((ExecuteFunctionOpImpl) op, (byte) 1/*
isReExecute */);
- ((ExecuteFunctionOpImpl)
op).getMessage().setTransactionId(transactionId);
- }
- return this.pool.execute(op);
}
public void setupServerAffinity(boolean allowFailover) {
@@ -295,6 +296,14 @@ public class OpExecutorImpl implements ExecutablePool {
return this.affinityServerLocation.get();
}
+ int getAffinityRetryCount() {
+ return affinityRetryCount.get();
+ }
+
+ void setAffinityRetryCount(int retryCount) {
+ affinityRetryCount.set(retryCount);
+ }
+
public void setServerAffinityLocation(ServerLocation serverLocation) {
assert this.affinityServerLocation.get() == null;
this.affinityServerLocation.set(serverLocation);
diff --git
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/TXFailoverOp.java
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/TXFailoverOp.java
index 90714c1..5786999 100644
---
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/TXFailoverOp.java
+++
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/TXFailoverOp.java
@@ -33,7 +33,7 @@ public class TXFailoverOp {
// no instance
}
- private static class TXFailoverOpImpl extends AbstractOp {
+ protected static class TXFailoverOpImpl extends AbstractOp {
int txId;
protected TXFailoverOpImpl(int txId) {
diff --git
a/geode-core/src/test/java/org/apache/geode/cache/client/internal/OpExecutorImplJUnitTest.java
b/geode-core/src/test/java/org/apache/geode/cache/client/internal/OpExecutorImplJUnitTest.java
index 5478292..3b9afd7 100644
---
a/geode-core/src/test/java/org/apache/geode/cache/client/internal/OpExecutorImplJUnitTest.java
+++
b/geode-core/src/test/java/org/apache/geode/cache/client/internal/OpExecutorImplJUnitTest.java
@@ -14,8 +14,16 @@
*/
package org.apache.geode.cache.client.internal;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import java.io.IOException;
import java.io.InputStream;
@@ -42,6 +50,7 @@ import
org.apache.geode.cache.client.internal.pooling.ConnectionManager;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.ServerLocation;
+import org.apache.geode.internal.cache.tier.sockets.Message;
import org.apache.geode.internal.cache.tier.sockets.ServerQueueStatus;
import org.apache.geode.internal.logging.InternalLogWriter;
import org.apache.geode.internal.logging.LocalLogWriter;
@@ -458,6 +467,79 @@ public class OpExecutorImplJUnitTest {
assertEquals(0, returns);
}
+ @Test
+ public void executeWithServerAffinityDoesNotChangeInitialRetryCountOfZero() {
+ OpExecutorImpl opExecutor =
+ new OpExecutorImpl(manager, queueManager, endpointManager, riTracker,
-1,
+ 10, true, cancelCriterion, mock(PoolImpl.class));
+ Op txSynchronizationOp = mock(TXSynchronizationOp.Impl.class);
+ ServerLocation serverLocation = mock(ServerLocation.class);
+ opExecutor.setAffinityRetryCount(0);
+
+ opExecutor.executeWithServerAffinity(serverLocation, txSynchronizationOp);
+
+ assertEquals(0, opExecutor.getAffinityRetryCount());
+ }
+
+ @Test
+ public void
executeWithServerAffinityWithNonZeroAffinityRetryCountWillNotSetToZero() {
+ OpExecutorImpl opExecutor =
+ new OpExecutorImpl(manager, queueManager, endpointManager, riTracker,
-1,
+ 10, true, cancelCriterion, mock(PoolImpl.class));
+
+ Op txSynchronizationOp = mock(TXSynchronizationOp.Impl.class);
+ ServerLocation serverLocation = mock(ServerLocation.class);
+ opExecutor.setAffinityRetryCount(1);
+
+ opExecutor.executeWithServerAffinity(serverLocation, txSynchronizationOp);
+
+ assertNotEquals(0, opExecutor.getAffinityRetryCount());
+ }
+
+ @Test
+ public void
executeWithServerAffinityWithServerConnectivityExceptionIncrementsRetryCountAndResetsToZero()
{
+ OpExecutorImpl opExecutor =
+ spy(new OpExecutorImpl(manager, queueManager, endpointManager,
riTracker, -1,
+ 10, true, cancelCriterion, mock(PoolImpl.class)));
+
+ Op txSynchronizationOp = mock(TXSynchronizationOp.Impl.class);
+ ServerLocation serverLocation = mock(ServerLocation.class);
+ ServerConnectivityException serverConnectivityException = new
ServerConnectivityException();
+
+
doThrow(serverConnectivityException).when(opExecutor).executeOnServer(serverLocation,
+ txSynchronizationOp, true, false);
+ opExecutor.setupServerAffinity(true);
+ when(((AbstractOp)
txSynchronizationOp).getMessage()).thenReturn(mock(Message.class));
+ opExecutor.setAffinityRetryCount(0);
+
+ opExecutor.executeWithServerAffinity(serverLocation, txSynchronizationOp);
+
+ verify(opExecutor, times(1)).setAffinityRetryCount(1);
+ assertEquals(0, opExecutor.getAffinityRetryCount());
+ }
+
+ @Test
+ public void
executeWithServerAffinityAndRetryCountGreaterThansTxRetryAttemptThrowsServerConnectivityException()
{
+ OpExecutorImpl opExecutor =
+ spy(new OpExecutorImpl(manager, queueManager, endpointManager,
riTracker, -1,
+ 10, true, cancelCriterion, mock(PoolImpl.class)));
+
+ Op txSynchronizationOp = mock(TXSynchronizationOp.Impl.class);
+ ServerLocation serverLocation = mock(ServerLocation.class);
+ ServerConnectivityException serverConnectivityException = new
ServerConnectivityException();
+
+
doThrow(serverConnectivityException).when(opExecutor).executeOnServer(serverLocation,
+ txSynchronizationOp, true, false);
+ opExecutor.setupServerAffinity(true);
+ when(((AbstractOp)
txSynchronizationOp).getMessage()).thenReturn(mock(Message.class));
+ opExecutor.setAffinityRetryCount(opExecutor.TX_RETRY_ATTEMPT + 1);
+
+ assertThatThrownBy(
+ () -> opExecutor.executeWithServerAffinity(serverLocation,
txSynchronizationOp))
+ .isSameAs(serverConnectivityException);
+ }
+
+
private class DummyManager implements ConnectionManager {
protected int numServers = Integer.MAX_VALUE;