This is an automated email from the ASF dual-hosted git repository. eshu11 pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new b4796e6 GEODE-5379: Reset affinityRetryCount to zero only when initial retry count is zero to avoid stack overflow (#2098) b4796e6 is described below commit b4796e6d6667ba2815871ea7f81d48fb92af87b1 Author: pivotal-eshu <e...@pivotal.io> AuthorDate: Wed Jul 11 09:13:21 2018 -0700 GEODE-5379: Reset affinityRetryCount to zero only when initial retry count is zero to avoid stack overflow (#2098) --- .../cache/client/internal/OpExecutorImpl.java | 97 ++++++++++++---------- .../geode/cache/client/internal/TXFailoverOp.java | 2 +- .../client/internal/OpExecutorImplJUnitTest.java | 82 ++++++++++++++++++ 3 files changed, 136 insertions(+), 45 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java index 4116dde..7ae4745 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java @@ -75,7 +75,7 @@ public class OpExecutorImpl implements ExecutablePool { private static final boolean TRY_SERVERS_ONCE = Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "PoolImpl.TRY_SERVERS_ONCE"); - private static final int TX_RETRY_ATTEMPT = + static final int TX_RETRY_ATTEMPT = Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "txRetryAttempt", 500); private final ConnectionManager connectionManager; @@ -102,6 +102,7 @@ public class OpExecutorImpl implements ExecutablePool { private boolean serverAffinityFailover = false; private final ThreadLocal<ServerLocation> affinityServerLocation = new ThreadLocal<ServerLocation>(); + private final ThreadLocal<Integer> affinityRetryCount = new ThreadLocal<Integer>() { protected Integer initialValue() { return 0; @@ -226,53 +227,53 @@ public class OpExecutorImpl implements ExecutablePool { * @param op the op to execute * @return the result of execution */ - private Object executeWithServerAffinity(ServerLocation loc, Op op) { + Object executeWithServerAffinity(ServerLocation loc, Op op) { + final int initialRetryCount = getAffinityRetryCount(); try { - Object retVal = executeOnServer(loc, op, true, false); - affinityRetryCount.set(0); - return retVal; - } catch (ServerConnectivityException e) { - if (logger.isDebugEnabled()) { - logger.debug("caught exception while executing with affinity:{}", e.getMessage(), e); - } - if (!this.serverAffinityFailover || e instanceof ServerOperationException) { - affinityRetryCount.set(0); - throw e; + try { + return executeOnServer(loc, op, true, false); + } catch (ServerConnectivityException e) { + if (logger.isDebugEnabled()) { + logger.debug("caught exception while executing with affinity:{}", e.getMessage(), e); + } + if (!this.serverAffinityFailover || e instanceof ServerOperationException) { + throw e; + } + int retryCount = getAffinityRetryCount(); + if ((retryAttempts != -1 && retryCount >= retryAttempts) + || retryCount > TX_RETRY_ATTEMPT) { + // prevent stack overflow + throw e; + } + setAffinityRetryCount(retryCount + 1); } - int retryCount = affinityRetryCount.get(); - if ((retryAttempts != -1 && retryCount >= retryAttempts) || retryCount > TX_RETRY_ATTEMPT) { // prevent - // stack - // overflow - // fixes - // bug - // 46535 - affinityRetryCount.set(0); - throw e; + this.affinityServerLocation.set(null); + if (logger.isDebugEnabled()) { + logger.debug("reset server affinity: attempting txFailover"); + } + // send TXFailoverOp, so that new server can + // do bootstrapping, then re-execute original op + AbstractOp absOp = (AbstractOp) op; + absOp.getMessage().setIsRetry(); + int transactionId = absOp.getMessage().getTransactionId(); + // for CommitOp we do not have transactionId in AbstractOp + // so set it explicitly for TXFailoverOp + TXFailoverOp.execute(this.pool, transactionId); + + if (op instanceof ExecuteRegionFunctionOpImpl) { + op = new ExecuteRegionFunctionOpImpl((ExecuteRegionFunctionOpImpl) op, + (byte) 1/* isReExecute */, new HashSet<String>()); + ((ExecuteRegionFunctionOpImpl) op).getMessage().setTransactionId(transactionId); + } else if (op instanceof ExecuteFunctionOpImpl) { + op = new ExecuteFunctionOpImpl((ExecuteFunctionOpImpl) op, (byte) 1/* isReExecute */); + ((ExecuteFunctionOpImpl) op).getMessage().setTransactionId(transactionId); + } + return this.pool.execute(op); + } finally { + if (initialRetryCount == 0) { + setAffinityRetryCount(0); } - affinityRetryCount.set(retryCount + 1); } - this.affinityServerLocation.set(null); - if (logger.isDebugEnabled()) { - logger.debug("reset server affinity: attempting txFailover"); - } - // send TXFailoverOp, so that new server can - // do bootstrapping, then re-execute original op - AbstractOp absOp = (AbstractOp) op; - absOp.getMessage().setIsRetry(); - int transactionId = absOp.getMessage().getTransactionId(); - // for CommitOp we do not have transactionId in AbstractOp - // so set it explicitly for TXFailoverOp - TXFailoverOp.execute(this.pool, transactionId); - - if (op instanceof ExecuteRegionFunctionOpImpl) { - op = new ExecuteRegionFunctionOpImpl((ExecuteRegionFunctionOpImpl) op, - (byte) 1/* isReExecute */, new HashSet<String>()); - ((ExecuteRegionFunctionOpImpl) op).getMessage().setTransactionId(transactionId); - } else if (op instanceof ExecuteFunctionOpImpl) { - op = new ExecuteFunctionOpImpl((ExecuteFunctionOpImpl) op, (byte) 1/* isReExecute */); - ((ExecuteFunctionOpImpl) op).getMessage().setTransactionId(transactionId); - } - return this.pool.execute(op); } public void setupServerAffinity(boolean allowFailover) { @@ -295,6 +296,14 @@ public class OpExecutorImpl implements ExecutablePool { return this.affinityServerLocation.get(); } + int getAffinityRetryCount() { + return affinityRetryCount.get(); + } + + void setAffinityRetryCount(int retryCount) { + affinityRetryCount.set(retryCount); + } + public void setServerAffinityLocation(ServerLocation serverLocation) { assert this.affinityServerLocation.get() == null; this.affinityServerLocation.set(serverLocation); diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/TXFailoverOp.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/TXFailoverOp.java index 90714c1..5786999 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/TXFailoverOp.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/TXFailoverOp.java @@ -33,7 +33,7 @@ public class TXFailoverOp { // no instance } - private static class TXFailoverOpImpl extends AbstractOp { + protected static class TXFailoverOpImpl extends AbstractOp { int txId; protected TXFailoverOpImpl(int txId) { diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/OpExecutorImplJUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/OpExecutorImplJUnitTest.java index 5478292..3b9afd7 100644 --- a/geode-core/src/test/java/org/apache/geode/cache/client/internal/OpExecutorImplJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/OpExecutorImplJUnitTest.java @@ -14,8 +14,16 @@ */ package org.apache.geode.cache.client.internal; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.fail; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.IOException; import java.io.InputStream; @@ -42,6 +50,7 @@ import org.apache.geode.cache.client.internal.pooling.ConnectionManager; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.distributed.internal.ServerLocation; +import org.apache.geode.internal.cache.tier.sockets.Message; import org.apache.geode.internal.cache.tier.sockets.ServerQueueStatus; import org.apache.geode.internal.logging.InternalLogWriter; import org.apache.geode.internal.logging.LocalLogWriter; @@ -458,6 +467,79 @@ public class OpExecutorImplJUnitTest { assertEquals(0, returns); } + @Test + public void executeWithServerAffinityDoesNotChangeInitialRetryCountOfZero() { + OpExecutorImpl opExecutor = + new OpExecutorImpl(manager, queueManager, endpointManager, riTracker, -1, + 10, true, cancelCriterion, mock(PoolImpl.class)); + Op txSynchronizationOp = mock(TXSynchronizationOp.Impl.class); + ServerLocation serverLocation = mock(ServerLocation.class); + opExecutor.setAffinityRetryCount(0); + + opExecutor.executeWithServerAffinity(serverLocation, txSynchronizationOp); + + assertEquals(0, opExecutor.getAffinityRetryCount()); + } + + @Test + public void executeWithServerAffinityWithNonZeroAffinityRetryCountWillNotSetToZero() { + OpExecutorImpl opExecutor = + new OpExecutorImpl(manager, queueManager, endpointManager, riTracker, -1, + 10, true, cancelCriterion, mock(PoolImpl.class)); + + Op txSynchronizationOp = mock(TXSynchronizationOp.Impl.class); + ServerLocation serverLocation = mock(ServerLocation.class); + opExecutor.setAffinityRetryCount(1); + + opExecutor.executeWithServerAffinity(serverLocation, txSynchronizationOp); + + assertNotEquals(0, opExecutor.getAffinityRetryCount()); + } + + @Test + public void executeWithServerAffinityWithServerConnectivityExceptionIncrementsRetryCountAndResetsToZero() { + OpExecutorImpl opExecutor = + spy(new OpExecutorImpl(manager, queueManager, endpointManager, riTracker, -1, + 10, true, cancelCriterion, mock(PoolImpl.class))); + + Op txSynchronizationOp = mock(TXSynchronizationOp.Impl.class); + ServerLocation serverLocation = mock(ServerLocation.class); + ServerConnectivityException serverConnectivityException = new ServerConnectivityException(); + + doThrow(serverConnectivityException).when(opExecutor).executeOnServer(serverLocation, + txSynchronizationOp, true, false); + opExecutor.setupServerAffinity(true); + when(((AbstractOp) txSynchronizationOp).getMessage()).thenReturn(mock(Message.class)); + opExecutor.setAffinityRetryCount(0); + + opExecutor.executeWithServerAffinity(serverLocation, txSynchronizationOp); + + verify(opExecutor, times(1)).setAffinityRetryCount(1); + assertEquals(0, opExecutor.getAffinityRetryCount()); + } + + @Test + public void executeWithServerAffinityAndRetryCountGreaterThansTxRetryAttemptThrowsServerConnectivityException() { + OpExecutorImpl opExecutor = + spy(new OpExecutorImpl(manager, queueManager, endpointManager, riTracker, -1, + 10, true, cancelCriterion, mock(PoolImpl.class))); + + Op txSynchronizationOp = mock(TXSynchronizationOp.Impl.class); + ServerLocation serverLocation = mock(ServerLocation.class); + ServerConnectivityException serverConnectivityException = new ServerConnectivityException(); + + doThrow(serverConnectivityException).when(opExecutor).executeOnServer(serverLocation, + txSynchronizationOp, true, false); + opExecutor.setupServerAffinity(true); + when(((AbstractOp) txSynchronizationOp).getMessage()).thenReturn(mock(Message.class)); + opExecutor.setAffinityRetryCount(opExecutor.TX_RETRY_ATTEMPT + 1); + + assertThatThrownBy( + () -> opExecutor.executeWithServerAffinity(serverLocation, txSynchronizationOp)) + .isSameAs(serverConnectivityException); + } + + private class DummyManager implements ConnectionManager { protected int numServers = Integer.MAX_VALUE;