This is an automated email from the ASF dual-hosted git repository.
eshu11 pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new a94fd4d GEODE-3516: Avoid tryResume call to add the thread again into
the waiting thread queue
a94fd4d is described below
commit a94fd4dd288277df54b1b3b90959097c928ff67d
Author: eshu <[email protected]>
AuthorDate: Wed Sep 6 14:53:47 2017 -0700
GEODE-3516: Avoid tryResume call to add the thread again into the waiting
thread queue
Avoid tryResume add a thread multiple times into the waiting queue.
Add a unit test that verify the fix.
---
.../apache/geode/internal/cache/TXManagerImpl.java | 61 ++++++++++++----------
.../internal/cache/TXManagerImplJUnitTest.java | 59 +++++++++++++++++++++
2 files changed, 92 insertions(+), 28 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
index a0a4d7c..b106546 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
@@ -1301,50 +1301,55 @@ public class TXManagerImpl implements
CacheTransactionManager, MembershipListene
*/
private ConcurrentMap<TransactionId, Queue<Thread>> waitMap = new
ConcurrentHashMap<>();
+ Queue<Thread> getWaitQueue(TransactionId transactionId) {
+ return waitMap.get(transactionId);
+ }
+
+ private Queue<Thread> getOrCreateWaitQueue(TransactionId transactionId) {
+ Queue<Thread> threadq = getWaitQueue(transactionId);
+ if (threadq == null) {
+ threadq = new ConcurrentLinkedQueue<Thread>();
+ Queue<Thread> oldq = waitMap.putIfAbsent(transactionId, threadq);
+ if (oldq != null) {
+ threadq = oldq;
+ }
+ }
+ return threadq;
+ }
+
public boolean tryResume(TransactionId transactionId, long time, TimeUnit
unit) {
if (transactionId == null || getTXState() != null ||
!exists(transactionId)) {
return false;
}
- Thread currentThread = Thread.currentThread();
- long timeout = unit.toNanos(time);
- long startTime = System.nanoTime();
- Queue<Thread> threadq = null;
+ final Thread currentThread = Thread.currentThread();
+ final long endTime = System.nanoTime() + unit.toNanos(time);
+ final Queue<Thread> threadq = getOrCreateWaitQueue(transactionId);
try {
while (true) {
- threadq = waitMap.get(transactionId);
- if (threadq == null) {
- threadq = new ConcurrentLinkedQueue<Thread>();
- Queue<Thread> oldq = waitMap.putIfAbsent(transactionId, threadq);
- if (oldq != null) {
- threadq = oldq;
- }
+ if (!threadq.contains(currentThread)) {
+ threadq.add(currentThread);
}
- threadq.add(currentThread);
- // after putting this thread in waitMap, we should check for
- // an entry in suspendedTXs. if no entry is found in suspendedTXs
- // next invocation of suspend() will unblock this thread
if (tryResume(transactionId)) {
return true;
- } else if (!exists(transactionId)) {
+ }
+ if (!exists(transactionId)) {
return false;
}
- LockSupport.parkNanos(timeout);
- long nowTime = System.nanoTime();
- timeout -= nowTime - startTime;
- startTime = nowTime;
- if (timeout <= 0) {
- break;
+ long parkTimeout = endTime - System.nanoTime();
+ if (parkTimeout <= 0) {
+ return false;
}
+ parkToRetryResume(parkTimeout);
}
} finally {
- threadq = waitMap.get(transactionId);
- if (threadq != null) {
- threadq.remove(currentThread);
- // the queue itself will be removed at commit/rollback
- }
+ threadq.remove(currentThread);
+ // the queue itself will be removed at commit/rollback
}
- return false;
+ }
+
+ void parkToRetryResume(long timeout) {
+ LockSupport.parkNanos(timeout);
}
public boolean exists(TransactionId transactionId) {
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/TXManagerImplJUnitTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/TXManagerImplJUnitTest.java
index a2c1e70..d252d92 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/TXManagerImplJUnitTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/TXManagerImplJUnitTest.java
@@ -14,6 +14,9 @@
*/
package org.apache.geode.internal.cache;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+
import org.apache.geode.cache.*;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.test.junit.categories.IntegrationTest;
@@ -23,8 +26,11 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import java.util.Properties;
+import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -331,4 +337,57 @@ public class TXManagerImplJUnitTest {
protected void callIsDistributed(TXManagerImpl txMgr) {
assertFalse(txMgr.isDistributed());
}
+
+ @Test
+ public void testTryResumeRemoveItselfFromWaitingQueue() throws Exception {
+ int time = 30;
+ long timeout = TimeUnit.SECONDS.toNanos(time);
+ TXManagerImpl txMgr = (TXManagerImpl) cache.getCacheTransactionManager();
+ TXManagerImpl spyMgr = spy(txMgr);
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ Thread.sleep(10);
+ return null;
+ }
+ }).when(spyMgr).parkToRetryResume(timeout);
+ spyMgr.begin();
+ region.put("key", "value");
+ final TransactionId txId = spyMgr.suspend();
+ spyMgr.resume(txId);
+ final CountDownLatch latch1 = new CountDownLatch(2);
+ final CountDownLatch latch2 = new CountDownLatch(2);
+ Thread t1 = new Thread(new Runnable() {
+ public void run() {
+ latch1.countDown();
+ assertTrue(spyMgr.tryResume(txId, time, TimeUnit.SECONDS));
+ region.put("key1", "value1");
+ assertEquals(txId, spyMgr.suspend());
+ latch2.countDown();
+ }
+ });
+ Thread t2 = new Thread(new Runnable() {
+ public void run() {
+ latch1.countDown();
+ assertTrue(spyMgr.tryResume(txId, time, TimeUnit.SECONDS));
+ region.put("key2", "value1");
+ assertEquals(txId, spyMgr.suspend());
+ latch2.countDown();
+ }
+ });
+ t1.start();
+ t2.start();
+ Thread.sleep(300);
+ if (!latch1.await(30, TimeUnit.SECONDS)) {
+ fail("junit test failed");
+ }
+ spyMgr.suspend();
+ if (!latch2.await(30, TimeUnit.SECONDS)) {
+ fail("junit test failed");
+ }
+ spyMgr.tryResume(txId, time, TimeUnit.SECONDS);
+ assertEquals(3, region.size());
+ assertEquals(0, spyMgr.getWaitQueue(txId).size());
+ spyMgr.commit();
+ }
}
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].