This is an automated email from the ASF dual-hosted git repository. eshu11 pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new d77744a GEODE-5186: set operation in a client transaction could cause the transaction to hang (#1967) d77744a is described below commit d77744a4c87ddcc157584119c93fbd300410d442 Author: Dale Emery <d...@dhemery.com> AuthorDate: Wed May 23 13:17:27 2018 -0700 GEODE-5186: set operation in a client transaction could cause the transaction to hang (#1967) GEODE-5186: Do not retry set operation on partitioned region if in a transaction. FetchKeyMessage will be sent to peer members as non transactional if it is sent from the transaction host member. --- .../apache/geode/internal/cache/TXManagerImpl.java | 8 ++ .../cache/partitioned/FetchKeysMessage.java | 93 +++++++++++---- .../cache/tier/sockets/command/KeySet.java | 25 ++++- .../cache/partitioned/FetchKeysMessageTest.java | 125 +++++++++++++++++++++ .../cache/tier/sockets/command/KeySetTest.java | 78 +++++++++++++ 5 files changed, 304 insertions(+), 25 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java index ccc68f2..efe1a10 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java @@ -195,6 +195,14 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene currentInstance = this; } + public static TXManagerImpl getCurrentInstanceForTest() { + return currentInstance; + } + + public static void setCurrentInstanceForTest(TXManagerImpl instance) { + currentInstance = instance; + } + InternalCache getCache() { return this.cache; } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/FetchKeysMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/FetchKeysMessage.java index 0ae339b..a62b1b4 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/FetchKeysMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/FetchKeysMessage.java @@ -44,6 +44,8 @@ import org.apache.geode.internal.cache.ForceReattemptException; import org.apache.geode.internal.cache.InitialImageOperation; import org.apache.geode.internal.cache.PartitionedRegion; import org.apache.geode.internal.cache.PartitionedRegionDataStore; +import org.apache.geode.internal.cache.TXManagerImpl; +import org.apache.geode.internal.cache.TXStateProxy; import org.apache.geode.internal.cache.tier.InterestType; import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.logging.LogService; @@ -57,10 +59,14 @@ public class FetchKeysMessage extends PartitionMessage { private Integer bucketId; - /** the interest policy to use in processing the keys */ + /** + * the interest policy to use in processing the keys + */ private int interestType; - /** the argument for the interest type (regex string, className, list of keys) */ + /** + * the argument for the interest type (regex string, className, list of keys) + */ private Object interestArg; private boolean allowTombstones; @@ -93,24 +99,42 @@ public class FetchKeysMessage extends PartitionMessage { public static FetchKeysResponse send(InternalDistributedMember recipient, PartitionedRegion r, Integer bucketId, boolean allowTombstones) throws ForceReattemptException { Assert.assertTrue(recipient != null, "FetchKeysMessage NULL recipient"); - FetchKeysMessage tmp = new FetchKeysMessage(); - FetchKeysResponse p = - (FetchKeysResponse) tmp.createReplyProcessor(r, Collections.singleton(recipient)); - FetchKeysMessage m = new FetchKeysMessage(recipient, r.getPRId(), p, bucketId, - InterestType.REGULAR_EXPRESSION, ".*", allowTombstones); - m.setTransactionDistributed(r.getCache().getTxManager().isDistributed()); + TXManagerImpl txManager = r.getCache().getTxManager(); + boolean resetTxState = isTransactionInternalSuspendNeeded(txManager); + TXStateProxy txStateProxy = null; + if (resetTxState) { + txStateProxy = txManager.pauseTransaction(); + } - Set failures = r.getDistributionManager().putOutgoing(m); - if (failures != null && failures.size() > 0) { - throw new ForceReattemptException( - LocalizedStrings.FetchKeysMessage_FAILED_SENDING_0.toLocalizedString(m)); + try { + FetchKeysMessage tmp = new FetchKeysMessage(); + + FetchKeysResponse p = + (FetchKeysResponse) tmp.createReplyProcessor(r, Collections.singleton(recipient)); + FetchKeysMessage m = new FetchKeysMessage(recipient, r.getPRId(), p, bucketId, + InterestType.REGULAR_EXPRESSION, ".*", allowTombstones); + m.setTransactionDistributed(txManager.isDistributed()); + + Set failures = r.getDistributionManager().putOutgoing(m); + if (failures != null && failures.size() > 0) { + throw new ForceReattemptException( + LocalizedStrings.FetchKeysMessage_FAILED_SENDING_0.toLocalizedString(m)); + } + return p; + } finally { + if (resetTxState) { + txManager.unpauseTransaction(txStateProxy); + } } + } - return p; + private static boolean isTransactionInternalSuspendNeeded(TXManagerImpl txManager) { + TXStateProxy txState = txManager.getTXState(); + // handle distributed transaction when needed. + return txState != null && txState.isRealDealLocal() && !txState.isDistTx(); } /** - * * @return the FetchKeysResponse * @throws ForceReattemptException if the peer is no longer available */ @@ -216,17 +240,29 @@ public class FetchKeysMessage extends PartitionMessage { } public static class FetchKeysReplyMessage extends ReplyMessage { - /** The number of the series */ + /** + * The number of the series + */ int seriesNum; - /** The message number in the series */ + /** + * The message number in the series + */ int msgNum; - /** The total number of series */ + /** + * The total number of series + */ int numSeries; - /** Whether this is the last of a series */ + /** + * Whether this is the last of a series + */ boolean lastInSeries; - /** the stream holding the chunk to send */ + /** + * the stream holding the chunk to send + */ transient HeapDataOutputStream chunkStream; - /** the array holding data received */ + /** + * the array holding data received + */ transient byte[] chunk; /** @@ -440,6 +476,7 @@ public class FetchKeysMessage extends PartitionMessage { return sb.toString(); } } + /** * A processor to capture the value returned by * {@link org.apache.geode.internal.cache.partitioned.GetMessage.GetReplyMessage} @@ -452,16 +489,24 @@ public class FetchKeysMessage extends PartitionMessage { private final Set returnValue; - /** lock used to synchronize chunk processing */ + /** + * lock used to synchronize chunk processing + */ private final Object endLock = new Object(); - /** number of chunks processed */ + /** + * number of chunks processed + */ private volatile int chunksProcessed; - /** chunks expected (set when last chunk has been processed */ + /** + * chunks expected (set when last chunk has been processed + */ private volatile int chunksExpected; - /** whether the last chunk has been processed */ + /** + * whether the last chunk has been processed + */ private volatile boolean lastChunkReceived; public FetchKeysResponse(InternalDistributedSystem ds, PartitionedRegion pr, Set recipients) { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/KeySet.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/KeySet.java index 63bfb89..c18da6d 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/KeySet.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/KeySet.java @@ -21,8 +21,10 @@ import java.util.List; import java.util.Set; import org.apache.geode.cache.Region; +import org.apache.geode.cache.TransactionException; import org.apache.geode.cache.operations.KeySetOperationContext; import org.apache.geode.internal.cache.LocalRegion; +import org.apache.geode.internal.cache.TXManagerImpl; import org.apache.geode.internal.cache.tier.Command; import org.apache.geode.internal.cache.tier.MessageType; import org.apache.geode.internal.cache.tier.sockets.BaseCommand; @@ -92,10 +94,23 @@ public class KeySet extends BaseCommand { return; } + if (isInTransaction() && region.getPartitionAttributes() != null) { + // GEODE-5186: fail the the transaction if it is a retry after failover for keySet on + // partitioned region + if (clientMessage.isRetry()) { + keySetWriteChunkedException(clientMessage, + new TransactionException( + "Failover on a set operation of a partitioned region is not allowed in a transaction."), + serverConnection); + serverConnection.setAsTrue(RESPONDED); + return; + } + } + try { securityService.authorize(Resource.DATA, Operation.READ, regionName); } catch (NotAuthorizedException ex) { - writeChunkedException(clientMessage, ex, serverConnection); + keySetWriteChunkedException(clientMessage, ex, serverConnection); serverConnection.setAsTrue(RESPONDED); return; } @@ -149,6 +164,11 @@ public class KeySet extends BaseCommand { } + protected void keySetWriteChunkedException(Message clientMessage, Throwable ex, + ServerConnection serverConnection) throws IOException { + writeChunkedException(clientMessage, ex, serverConnection); + } + private void fillAndSendKeySetResponseChunks(LocalRegion region, String regionName, KeySetOperationContext context, ServerConnection servConn) throws IOException { @@ -199,4 +219,7 @@ public class KeySet extends BaseCommand { chunkedResponseMsg.sendChunk(servConn); } + boolean isInTransaction() { + return TXManagerImpl.getCurrentTXState() != null; + } } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/FetchKeysMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/FetchKeysMessageTest.java new file mode 100644 index 0000000..6962e13 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/FetchKeysMessageTest.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.partitioned; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Answers.RETURNS_DEEP_STUBS; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.InOrder; +import org.mockito.Mock; + +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.InternalDistributedSystem; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.cache.CachePerfStats; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.PartitionedRegion; +import org.apache.geode.internal.cache.TXId; +import org.apache.geode.internal.cache.TXManagerImpl; +import org.apache.geode.internal.cache.TXStateProxy; +import org.apache.geode.test.junit.categories.UnitTest; + +@Category(UnitTest.class) +public class FetchKeysMessageTest { + @Mock(answer = RETURNS_DEEP_STUBS) + private InternalCache cache; + @Mock + private DistributionManager distributionManager; + @Mock(answer = RETURNS_DEEP_STUBS) + private InternalDistributedSystem distributedSystem; + @Mock + private InternalDistributedMember recipient; + @Mock(answer = RETURNS_DEEP_STUBS) + private PartitionedRegion region; + @Mock + private TXStateProxy txStateProxy; + @Captor + private ArgumentCaptor<FetchKeysMessage> sentMessage; + private TXManagerImpl originalTxManager; + private TXManagerImpl txManager; + + @Before + public void setup() { + initMocks(this); + + when(cache.getDistributedSystem()).thenReturn(distributedSystem); + when(distributedSystem.getDistributionManager()).thenReturn(distributionManager); + when(region.getCache()).thenReturn(cache); + when(region.getDistributionManager()).thenReturn(distributionManager); + when(txStateProxy.isInProgress()).thenReturn(true); + + originalTxManager = TXManagerImpl.getCurrentInstanceForTest(); + // The constructor sets the new tx manager as currentInstance + txManager = spy(new TXManagerImpl(mock(CachePerfStats.class), cache)); + txManager.setTXState(txStateProxy); + txManager.setDistributed(false); + + when(cache.getTxManager()).thenReturn(txManager); + } + + @After + public void restoreTxManager() { + TXManagerImpl.setCurrentInstanceForTest(originalTxManager); + } + + @Test + public void sendsWithTransactionPaused_ifTransactionIsHostedLocally() throws Exception { + // Transaction is locally hosted + when(txStateProxy.isRealDealLocal()).thenReturn(true); + when(txStateProxy.isDistTx()).thenReturn(false); + + FetchKeysMessage.send(recipient, region, 1, false); + + InOrder inOrder = inOrder(txManager, distributionManager); + inOrder.verify(txManager, times(1)).pauseTransaction(); + inOrder.verify(distributionManager, times(1)).putOutgoing(sentMessage.capture()); + inOrder.verify(txManager, times(1)).unpauseTransaction(same(txStateProxy)); + + assertThat(sentMessage.getValue().getTXUniqId()).isEqualTo(TXManagerImpl.NOTX); + } + + @Test + public void sendsWithoutPausingTransaction_ifTransactionIsNotHostedLocally() throws Exception { + // Transaction is not locally hosted + when(txStateProxy.isRealDealLocal()).thenReturn(false); + + int uniqueId = 99; + TXId txID = new TXId(recipient, uniqueId); + when(txStateProxy.getTxId()).thenReturn(txID); + + FetchKeysMessage.send(recipient, region, 1, false); + + verify(distributionManager, times(1)).putOutgoing(sentMessage.capture()); + assertThat(sentMessage.getValue().getTXUniqId()).isEqualTo(uniqueId); + verify(txManager, never()).pauseTransaction(); + verify(txManager, never()).unpauseTransaction(any()); + } +} diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/KeySetTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/KeySetTest.java index a8a4429..356b4be 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/KeySetTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/KeySetTest.java @@ -22,6 +22,8 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.io.IOException; + import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -31,6 +33,8 @@ import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.apache.geode.CancelCriterion; +import org.apache.geode.cache.PartitionAttributes; +import org.apache.geode.cache.TransactionException; import org.apache.geode.cache.operations.KeySetOperationContext; import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.LocalRegion; @@ -100,6 +104,61 @@ public class KeySetTest { } @Test + public void retryKeySet_doesNotWriteTransactionException_ifIsNotInTransaction() throws Exception { + long startTime = 0; // arbitrary value + TestableKeySet keySet = new TestableKeySet(); + keySet.setIsInTransaction(false); + when(message.isRetry()).thenReturn(true); + when(region.getPartitionAttributes()).thenReturn(mock(PartitionAttributes.class)); + + keySet.cmdExecute(message, serverConnection, securityService, startTime); + + assertThat(keySet.exceptionSentToClient).isNull(); + } + + @Test + public void nonRetryKeySet_doesNotWriteTransactionException() throws Exception { + long startTime = 0; // arbitrary value + TestableKeySet keySet = new TestableKeySet(); + keySet.setIsInTransaction(true); + when(message.isRetry()).thenReturn(false); + when(region.getPartitionAttributes()).thenReturn(mock(PartitionAttributes.class)); + + keySet.cmdExecute(message, serverConnection, securityService, startTime); + + assertThat(keySet.exceptionSentToClient).isNull(); + } + + @Test + public void retryKeySet_doesNotWriteTransactionException_ifIsInTransactionAndIsNotPartitionedRegion() + throws Exception { + long startTime = 0; // arbitrary value + TestableKeySet keySet = new TestableKeySet(); + keySet.setIsInTransaction(true); + when(message.isRetry()).thenReturn(true); + when(region.getPartitionAttributes()).thenReturn(null); + + keySet.cmdExecute(message, serverConnection, securityService, startTime); + + assertThat(keySet.exceptionSentToClient).isNull(); + } + + @Test + public void retryKeySet_writesTransactionException_ifIsInTransactionAndIsPartitionedRegion() + throws Exception { + long startTime = 0; // arbitrary value + TestableKeySet keySet = new TestableKeySet(); + keySet.setIsInTransaction(true); + when(message.isRetry()).thenReturn(true); + when(region.getPartitionAttributes()).thenReturn(mock(PartitionAttributes.class)); + + keySet.cmdExecute(message, serverConnection, securityService, startTime); + + assertThat(keySet.exceptionSentToClient).isInstanceOf(TransactionException.class).hasMessage( + "Failover on a set operation of a partitioned region is not allowed in a transaction."); + } + + @Test public void noSecurityShouldSucceed() throws Exception { when(this.securityService.isClientSecurityRequired()).thenReturn(false); @@ -161,4 +220,23 @@ public class KeySetTest { verify(this.chunkedResponseMessage).sendChunk(eq(this.serverConnection)); } + private class TestableKeySet extends KeySet { + private boolean isInTransaction = false; + public Throwable exceptionSentToClient; + + public void setIsInTransaction(boolean isInTransaction) { + this.isInTransaction = isInTransaction; + } + + @Override + public boolean isInTransaction() { + return isInTransaction; + } + + @Override + protected void keySetWriteChunkedException(Message clientMessage, Throwable ex, + ServerConnection serverConnection) throws IOException { + this.exceptionSentToClient = ex; + } + } } -- To stop receiving notification emails like this one, please contact esh...@apache.org.