This is an automated email from the ASF dual-hosted git repository.
eshu11 pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new b1486ae GEODE-4083: fix infinite loop caused by thread race changing
version (#1161)
b1486ae is described below
commit b1486ae6d830d6c707264f1a03509a621cb95570
Author: pivotal-eshu <[email protected]>
AuthorDate: Thu Dec 14 08:05:28 2017 -0800
GEODE-4083: fix infinite loop caused by thread race changing version (#1161)
* GEODE-4083: fix infinite loop caused by thread race changing version
---
.../cache/versions/RegionVersionVector.java | 99 ++++++++++-------
...JUnitTest.java => RegionVersionVectorTest.java} | 122 +++++++++++++++++++--
2 files changed, 173 insertions(+), 48 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/versions/RegionVersionVector.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/versions/RegionVersionVector.java
index aaa40a9..036365c 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/versions/RegionVersionVector.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/versions/RegionVersionVector.java
@@ -17,7 +17,14 @@ package org.apache.geode.internal.cache.versions;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.*;
+import java.util.Collections;
+import java.util.ConcurrentModificationException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -27,6 +34,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelCriterion;
+import org.apache.geode.annotations.TestingOnly;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.DM;
import org.apache.geode.distributed.internal.DistributionConfig;
@@ -144,22 +152,56 @@ public abstract class RegionVersionVector<T extends
VersionSource<?>>
private final transient Object clearLockSync = new Object(); // sync for
coordinating thread
// startup and
lockOwner setting
- /** create a live version vector for a region */
+ /**
+ * constructor used to create a cloned vector
+ */
+ protected RegionVersionVector(T ownerId, ConcurrentHashMap<T,
RegionVersionHolder<T>> vector,
+ long version, ConcurrentHashMap<T, Long> gcVersions, long gcVersion,
boolean singleMember,
+ RegionVersionHolder<T> localExceptions) {
+ this.myId = ownerId;
+ this.memberToVersion = vector;
+ this.memberToGCVersion = gcVersions;
+ this.localGCVersion.set(gcVersion);
+ this.localVersion.set(version);
+ this.singleMember = singleMember;
+ this.localExceptions = localExceptions;
+ }
+
+ /**
+ * deserialize a cloned vector
+ */
+ public RegionVersionVector() {
+ this.memberToVersion = new ConcurrentHashMap<T,
RegionVersionHolder<T>>(INITIAL_CAPACITY,
+ LOAD_FACTOR, CONCURRENCY_LEVEL);
+ this.memberToGCVersion =
+ new ConcurrentHashMap<T, Long>(INITIAL_CAPACITY, LOAD_FACTOR,
CONCURRENCY_LEVEL);
+ }
+
+ /**
+ * create a live version vector for a region
+ */
public RegionVersionVector(T ownerId) {
this(ownerId, null);
}
- /** create a live version vector for a region */
+ /**
+ * create a live version vector for a region
+ */
public RegionVersionVector(T ownerId, LocalRegion owner) {
+ this(ownerId, owner, 0);
+ }
+
+ @TestingOnly
+ RegionVersionVector(T ownerId, LocalRegion owner, long version) {
this.myId = ownerId;
this.isLiveVector = true;
this.region = owner;
-
this.localExceptions = new RegionVersionHolder<T>(0);
- this.memberToVersion = new ConcurrentHashMap<T,
RegionVersionHolder<T>>(INITIAL_CAPACITY,
- LOAD_FACTOR, CONCURRENCY_LEVEL);
+ this.memberToVersion =
+ new ConcurrentHashMap<>(INITIAL_CAPACITY, LOAD_FACTOR,
CONCURRENCY_LEVEL);
this.memberToGCVersion =
- new ConcurrentHashMap<T, Long>(INITIAL_CAPACITY, LOAD_FACTOR,
CONCURRENCY_LEVEL);
+ new ConcurrentHashMap<>(INITIAL_CAPACITY, LOAD_FACTOR,
CONCURRENCY_LEVEL);
+ this.localVersion.set(version);
}
/**
@@ -574,17 +616,20 @@ public abstract class RegionVersionVector<T extends
VersionSource<?>>
}
}
- private void updateLocalVersion(long version) {
- boolean repeat = false;
+ void updateLocalVersion(long newVersion) {
+ boolean needToTrySetAgain;
do {
- long myVersion = this.localVersion.get();
- if (myVersion < version) {
- repeat = !this.localVersion.compareAndSet(myVersion, version);
+ needToTrySetAgain = false;
+ long currentVersion = this.localVersion.get();
+ if (currentVersion < newVersion) {
+ needToTrySetAgain = !compareAndSetVersion(currentVersion, newVersion);
}
- } while (repeat);
+ } while (needToTrySetAgain);
}
-
+ boolean compareAndSetVersion(long currentVersion, long newVersion) {
+ return this.localVersion.compareAndSet(currentVersion, newVersion);
+ }
/**
* Records a received region-version. These are transmitted in VersionTags
in messages between
@@ -1093,32 +1138,6 @@ public abstract class RegionVersionVector<T extends
VersionSource<?>>
}
/**
- * constructor used to create a cloned vector
- *
- * @param localExceptions
- */
- protected RegionVersionVector(T ownerId, ConcurrentHashMap<T,
RegionVersionHolder<T>> vector,
- long version, ConcurrentHashMap<T, Long> gcVersions, long gcVersion,
boolean singleMember,
- RegionVersionHolder<T> localExceptions) {
- this.myId = ownerId;
- this.memberToVersion = vector;
- this.memberToGCVersion = gcVersions;
- this.localGCVersion.set(gcVersion);
- this.localVersion.set(version);
- this.singleMember = singleMember;
- this.localExceptions = localExceptions;
- }
-
-
- /** deserialize a cloned vector */
- public RegionVersionVector() {
- this.memberToVersion = new ConcurrentHashMap<T,
RegionVersionHolder<T>>(INITIAL_CAPACITY,
- LOAD_FACTOR, CONCURRENCY_LEVEL);
- this.memberToGCVersion =
- new ConcurrentHashMap<T, Long>(INITIAL_CAPACITY, LOAD_FACTOR,
CONCURRENCY_LEVEL);
- }
-
- /**
* after deserializing a version tag or RVV the IDs in it should be replaced
with references to
* IDs returned by this method. This vastly reduces the memory footprint of
tags/stamps/rvvs
*
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/versions/RegionVersionVectorJUnitTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/versions/RegionVersionVectorTest.java
similarity index 85%
rename from
geode-core/src/test/java/org/apache/geode/internal/cache/versions/RegionVersionVectorJUnitTest.java
rename to
geode-core/src/test/java/org/apache/geode/internal/cache/versions/RegionVersionVectorTest.java
index b124fc7..e3e7014 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/versions/RegionVersionVectorJUnitTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/versions/RegionVersionVectorTest.java
@@ -14,8 +14,14 @@
*/
package org.apache.geode.internal.cache.versions;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -27,8 +33,11 @@ import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -46,11 +55,20 @@ import org.apache.geode.test.dunit.NetworkUtils;
import org.apache.geode.test.junit.categories.UnitTest;
@Category(UnitTest.class)
-public class RegionVersionVectorJUnitTest {
+public class RegionVersionVectorTest {
+
+ private Future<Void> result;
@Rule
public ExpectedException expectedException = ExpectedException.none();
+ @After
+ public void tearDown() throws Exception {
+ if (result != null && !result.isDone()) {
+ result.cancel(true);
+ }
+ }
+
@Test
public void testExceptionsWithContains() {
DiskStoreID ownerId = new DiskStoreID(0, 0);
@@ -149,7 +167,6 @@ public class RegionVersionVectorJUnitTest {
assertTrue(rv1.getExceptionCount(server2) == 0);
assertTrue(rv1.contains(server2, 8));
-
// Test RVV comparisons for GII Delta
rv1 = new VMRegionVersionVector(server1);
rv1.recordVersion(server2, 1);
@@ -255,8 +272,6 @@ public class RegionVersionVectorJUnitTest {
}
assertFalse(rv1.contains(server2, boundary + 1));
- RegionVersionVector.DEBUG = true;
-
rv1.recordVersion(server2, bitSetRollPoint);
rv1.recordVersion(server2, bitSetRollPoint + 1); // bitSet should be
rolled at this point
RegionVersionHolder h = (RegionVersionHolder)
rv1.getMemberToVersion().get(server2);
@@ -275,7 +290,7 @@ public class RegionVersionVectorJUnitTest {
// now test the merge
System.out.println("testing merge for " + rv1.fullToString());
assertEquals(1, rv1.getExceptionCount(server2)); // one exception from
boundary-1 to
- // bitSetRollPoint
+ // bitSetRollPoint
assertFalse(rv1.contains(server2, bitSetRollPoint - 1));
assertTrue(rv1.contains(server2, bitSetRollPoint));
assertTrue(rv1.contains(server2, bitSetRollPoint + 1));
@@ -567,7 +582,53 @@ public class RegionVersionVectorJUnitTest {
rvv.recordVersion(ownerId, tag);
}
- public RegionVersionVector
createRegionVersionVector(InternalDistributedMember ownerId,
+ @Test
+ public void usesNewVersionIfGreaterThanOldVersion() throws Exception {
+ VersionSource<InternalDistributedMember> ownerId =
mock(VersionSource.class);
+ long oldVersion = 1;
+ long newVersion = 2;
+
+ RegionVersionVector rvv = new TestableRegionVersionVector(ownerId,
oldVersion);
+ rvv.updateLocalVersion(newVersion);
+ assertThat(rvv.getVersionForMember(ownerId)).isEqualTo(newVersion);
+ }
+
+ @Test
+ public void usesOldVersionIfGreaterThanNewVersion() throws Exception {
+ VersionSource<InternalDistributedMember> ownerId =
mock(VersionSource.class);
+ long oldVersion = 2;
+ long newVersion = 1;
+
+ RegionVersionVector rvv = new TestableRegionVersionVector(ownerId,
oldVersion);
+ rvv.updateLocalVersion(newVersion);
+ assertThat(rvv.getVersionForMember(ownerId)).isEqualTo(oldVersion);
+ }
+
+ @Test
+ public void doesNothingIfVersionsAreSame() throws Exception {
+ VersionSource<InternalDistributedMember> ownerId =
mock(VersionSource.class);
+ long oldVersion = 2;
+ long sameVersion = 2;
+
+ RegionVersionVector rvv = new TestableRegionVersionVector(ownerId,
oldVersion);
+ rvv.updateLocalVersion(sameVersion);
+ assertThat(rvv.getVersionForMember(ownerId)).isEqualTo(oldVersion);
+ }
+
+ @Test
+ public void doesNotHangIfOtherThreadChangedVersion() throws Exception {
+ VersionSource<InternalDistributedMember> ownerId =
mock(VersionSource.class);
+ long oldVersion = 1;
+ long newVersion = 2;
+
+ RegionVersionVector rvv = new
VersionRaceConditionRegionVersionVector(ownerId, oldVersion);
+ result = CompletableFuture.runAsync(() ->
rvv.updateLocalVersion(newVersion));
+
+ assertThatCode(() -> result.get(2, SECONDS)).doesNotThrowAnyException();
+ assertThat(rvv.getVersionForMember(ownerId)).isEqualTo(newVersion);
+ }
+
+ private RegionVersionVector
createRegionVersionVector(InternalDistributedMember ownerId,
LocalRegion owner) {
@SuppressWarnings({"unchecked", "rawtypes"})
RegionVersionVector rvv = new RegionVersionVector(ownerId, owner) {
@@ -615,4 +676,49 @@ public class RegionVersionVectorJUnitTest {
assertEquals(0, rvv.getExceptionCount(id));
}
+ private class TestableRegionVersionVector
+ extends RegionVersionVector<VersionSource<InternalDistributedMember>> {
+
+ TestableRegionVersionVector(VersionSource<InternalDistributedMember>
ownerId, long version) {
+ super(ownerId, null, version);
+ }
+
+ @Override
+ protected RegionVersionVector createCopy(VersionSource ownerId,
ConcurrentHashMap vector,
+ long version, ConcurrentHashMap gcVersions, long gcVersion, boolean
singleMember,
+ RegionVersionHolder clonedLocalHolder) {
+ return null;
+ }
+
+ @Override
+ protected VersionSource<InternalDistributedMember> readMember(DataInput in)
+ throws IOException, ClassNotFoundException {
+ return null;
+ }
+
+ @Override
+ protected void writeMember(VersionSource member, DataOutput out) throws
IOException {
+
+ }
+
+ @Override
+ public int getDSFID() {
+ return 0;
+ }
+ }
+
+ private class VersionRaceConditionRegionVersionVector extends
TestableRegionVersionVector {
+
+
VersionRaceConditionRegionVersionVector(VersionSource<InternalDistributedMember>
ownerId,
+ long version) {
+ super(ownerId, version);
+ }
+
+ @Override
+ boolean compareAndSetVersion(long currentVersion, long newVersion) {
+ super.compareAndSetVersion(currentVersion, newVersion);
+ return false;
+ }
+
+ }
}
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].