This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new b1486ae  GEODE-4083: fix infinite loop caused by thread race changing 
version (#1161)
b1486ae is described below

commit b1486ae6d830d6c707264f1a03509a621cb95570
Author: pivotal-eshu <[email protected]>
AuthorDate: Thu Dec 14 08:05:28 2017 -0800

    GEODE-4083: fix infinite loop caused by thread race changing version (#1161)
    
    * GEODE-4083: fix infinite loop caused by thread race changing version
---
 .../cache/versions/RegionVersionVector.java        |  99 ++++++++++-------
 ...JUnitTest.java => RegionVersionVectorTest.java} | 122 +++++++++++++++++++--
 2 files changed, 173 insertions(+), 48 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/versions/RegionVersionVector.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/versions/RegionVersionVector.java
index aaa40a9..036365c 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/versions/RegionVersionVector.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/versions/RegionVersionVector.java
@@ -17,7 +17,14 @@ package org.apache.geode.internal.cache.versions;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.*;
+import java.util.Collections;
+import java.util.ConcurrentModificationException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -27,6 +34,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.CancelCriterion;
+import org.apache.geode.annotations.TestingOnly;
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.internal.DM;
 import org.apache.geode.distributed.internal.DistributionConfig;
@@ -144,22 +152,56 @@ public abstract class RegionVersionVector<T extends 
VersionSource<?>>
   private final transient Object clearLockSync = new Object(); // sync for 
coordinating thread
                                                                // startup and 
lockOwner setting
 
-  /** create a live version vector for a region */
+  /**
+   * constructor used to create a cloned vector
+   */
+  protected RegionVersionVector(T ownerId, ConcurrentHashMap<T, 
RegionVersionHolder<T>> vector,
+      long version, ConcurrentHashMap<T, Long> gcVersions, long gcVersion, 
boolean singleMember,
+      RegionVersionHolder<T> localExceptions) {
+    this.myId = ownerId;
+    this.memberToVersion = vector;
+    this.memberToGCVersion = gcVersions;
+    this.localGCVersion.set(gcVersion);
+    this.localVersion.set(version);
+    this.singleMember = singleMember;
+    this.localExceptions = localExceptions;
+  }
+
+  /**
+   * deserialize a cloned vector
+   */
+  public RegionVersionVector() {
+    this.memberToVersion = new ConcurrentHashMap<T, 
RegionVersionHolder<T>>(INITIAL_CAPACITY,
+        LOAD_FACTOR, CONCURRENCY_LEVEL);
+    this.memberToGCVersion =
+        new ConcurrentHashMap<T, Long>(INITIAL_CAPACITY, LOAD_FACTOR, 
CONCURRENCY_LEVEL);
+  }
+
+  /**
+   * create a live version vector for a region
+   */
   public RegionVersionVector(T ownerId) {
     this(ownerId, null);
   }
 
-  /** create a live version vector for a region */
+  /**
+   * create a live version vector for a region
+   */
   public RegionVersionVector(T ownerId, LocalRegion owner) {
+    this(ownerId, owner, 0);
+  }
+
+  @TestingOnly
+  RegionVersionVector(T ownerId, LocalRegion owner, long version) {
     this.myId = ownerId;
     this.isLiveVector = true;
     this.region = owner;
-
     this.localExceptions = new RegionVersionHolder<T>(0);
-    this.memberToVersion = new ConcurrentHashMap<T, 
RegionVersionHolder<T>>(INITIAL_CAPACITY,
-        LOAD_FACTOR, CONCURRENCY_LEVEL);
+    this.memberToVersion =
+        new ConcurrentHashMap<>(INITIAL_CAPACITY, LOAD_FACTOR, 
CONCURRENCY_LEVEL);
     this.memberToGCVersion =
-        new ConcurrentHashMap<T, Long>(INITIAL_CAPACITY, LOAD_FACTOR, 
CONCURRENCY_LEVEL);
+        new ConcurrentHashMap<>(INITIAL_CAPACITY, LOAD_FACTOR, 
CONCURRENCY_LEVEL);
+    this.localVersion.set(version);
   }
 
   /**
@@ -574,17 +616,20 @@ public abstract class RegionVersionVector<T extends 
VersionSource<?>>
     }
   }
 
-  private void updateLocalVersion(long version) {
-    boolean repeat = false;
+  void updateLocalVersion(long newVersion) {
+    boolean needToTrySetAgain;
     do {
-      long myVersion = this.localVersion.get();
-      if (myVersion < version) {
-        repeat = !this.localVersion.compareAndSet(myVersion, version);
+      needToTrySetAgain = false;
+      long currentVersion = this.localVersion.get();
+      if (currentVersion < newVersion) {
+        needToTrySetAgain = !compareAndSetVersion(currentVersion, newVersion);
       }
-    } while (repeat);
+    } while (needToTrySetAgain);
   }
 
-
+  boolean compareAndSetVersion(long currentVersion, long newVersion) {
+    return this.localVersion.compareAndSet(currentVersion, newVersion);
+  }
 
   /**
    * Records a received region-version. These are transmitted in VersionTags 
in messages between
@@ -1093,32 +1138,6 @@ public abstract class RegionVersionVector<T extends 
VersionSource<?>>
   }
 
   /**
-   * constructor used to create a cloned vector
-   *
-   * @param localExceptions
-   */
-  protected RegionVersionVector(T ownerId, ConcurrentHashMap<T, 
RegionVersionHolder<T>> vector,
-      long version, ConcurrentHashMap<T, Long> gcVersions, long gcVersion, 
boolean singleMember,
-      RegionVersionHolder<T> localExceptions) {
-    this.myId = ownerId;
-    this.memberToVersion = vector;
-    this.memberToGCVersion = gcVersions;
-    this.localGCVersion.set(gcVersion);
-    this.localVersion.set(version);
-    this.singleMember = singleMember;
-    this.localExceptions = localExceptions;
-  }
-
-
-  /** deserialize a cloned vector */
-  public RegionVersionVector() {
-    this.memberToVersion = new ConcurrentHashMap<T, 
RegionVersionHolder<T>>(INITIAL_CAPACITY,
-        LOAD_FACTOR, CONCURRENCY_LEVEL);
-    this.memberToGCVersion =
-        new ConcurrentHashMap<T, Long>(INITIAL_CAPACITY, LOAD_FACTOR, 
CONCURRENCY_LEVEL);
-  }
-
-  /**
    * after deserializing a version tag or RVV the IDs in it should be replaced 
with references to
    * IDs returned by this method. This vastly reduces the memory footprint of 
tags/stamps/rvvs
    *
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/versions/RegionVersionVectorJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/versions/RegionVersionVectorTest.java
similarity index 85%
rename from 
geode-core/src/test/java/org/apache/geode/internal/cache/versions/RegionVersionVectorJUnitTest.java
rename to 
geode-core/src/test/java/org/apache/geode/internal/cache/versions/RegionVersionVectorTest.java
index b124fc7..e3e7014 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/versions/RegionVersionVectorJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/versions/RegionVersionVectorTest.java
@@ -14,8 +14,14 @@
  */
 package org.apache.geode.internal.cache.versions;
 
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -27,8 +33,11 @@ import java.io.IOException;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
 
+import org.junit.After;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -46,11 +55,20 @@ import org.apache.geode.test.dunit.NetworkUtils;
 import org.apache.geode.test.junit.categories.UnitTest;
 
 @Category(UnitTest.class)
-public class RegionVersionVectorJUnitTest {
+public class RegionVersionVectorTest {
+
+  private Future<Void> result;
 
   @Rule
   public ExpectedException expectedException = ExpectedException.none();
 
+  @After
+  public void tearDown() throws Exception {
+    if (result != null && !result.isDone()) {
+      result.cancel(true);
+    }
+  }
+
   @Test
   public void testExceptionsWithContains() {
     DiskStoreID ownerId = new DiskStoreID(0, 0);
@@ -149,7 +167,6 @@ public class RegionVersionVectorJUnitTest {
     assertTrue(rv1.getExceptionCount(server2) == 0);
     assertTrue(rv1.contains(server2, 8));
 
-
     // Test RVV comparisons for GII Delta
     rv1 = new VMRegionVersionVector(server1);
     rv1.recordVersion(server2, 1);
@@ -255,8 +272,6 @@ public class RegionVersionVectorJUnitTest {
     }
     assertFalse(rv1.contains(server2, boundary + 1));
 
-    RegionVersionVector.DEBUG = true;
-
     rv1.recordVersion(server2, bitSetRollPoint);
     rv1.recordVersion(server2, bitSetRollPoint + 1); // bitSet should be 
rolled at this point
     RegionVersionHolder h = (RegionVersionHolder) 
rv1.getMemberToVersion().get(server2);
@@ -275,7 +290,7 @@ public class RegionVersionVectorJUnitTest {
     // now test the merge
     System.out.println("testing merge for " + rv1.fullToString());
     assertEquals(1, rv1.getExceptionCount(server2)); // one exception from 
boundary-1 to
-                                                     // bitSetRollPoint
+    // bitSetRollPoint
     assertFalse(rv1.contains(server2, bitSetRollPoint - 1));
     assertTrue(rv1.contains(server2, bitSetRollPoint));
     assertTrue(rv1.contains(server2, bitSetRollPoint + 1));
@@ -567,7 +582,53 @@ public class RegionVersionVectorJUnitTest {
     rvv.recordVersion(ownerId, tag);
   }
 
-  public RegionVersionVector 
createRegionVersionVector(InternalDistributedMember ownerId,
+  @Test
+  public void usesNewVersionIfGreaterThanOldVersion() throws Exception {
+    VersionSource<InternalDistributedMember> ownerId = 
mock(VersionSource.class);
+    long oldVersion = 1;
+    long newVersion = 2;
+
+    RegionVersionVector rvv = new TestableRegionVersionVector(ownerId, 
oldVersion);
+    rvv.updateLocalVersion(newVersion);
+    assertThat(rvv.getVersionForMember(ownerId)).isEqualTo(newVersion);
+  }
+
+  @Test
+  public void usesOldVersionIfGreaterThanNewVersion() throws Exception {
+    VersionSource<InternalDistributedMember> ownerId = 
mock(VersionSource.class);
+    long oldVersion = 2;
+    long newVersion = 1;
+
+    RegionVersionVector rvv = new TestableRegionVersionVector(ownerId, 
oldVersion);
+    rvv.updateLocalVersion(newVersion);
+    assertThat(rvv.getVersionForMember(ownerId)).isEqualTo(oldVersion);
+  }
+
+  @Test
+  public void doesNothingIfVersionsAreSame() throws Exception {
+    VersionSource<InternalDistributedMember> ownerId = 
mock(VersionSource.class);
+    long oldVersion = 2;
+    long sameVersion = 2;
+
+    RegionVersionVector rvv = new TestableRegionVersionVector(ownerId, 
oldVersion);
+    rvv.updateLocalVersion(sameVersion);
+    assertThat(rvv.getVersionForMember(ownerId)).isEqualTo(oldVersion);
+  }
+
+  @Test
+  public void doesNotHangIfOtherThreadChangedVersion() throws Exception {
+    VersionSource<InternalDistributedMember> ownerId = 
mock(VersionSource.class);
+    long oldVersion = 1;
+    long newVersion = 2;
+
+    RegionVersionVector rvv = new 
VersionRaceConditionRegionVersionVector(ownerId, oldVersion);
+    result = CompletableFuture.runAsync(() -> 
rvv.updateLocalVersion(newVersion));
+
+    assertThatCode(() -> result.get(2, SECONDS)).doesNotThrowAnyException();
+    assertThat(rvv.getVersionForMember(ownerId)).isEqualTo(newVersion);
+  }
+
+  private RegionVersionVector 
createRegionVersionVector(InternalDistributedMember ownerId,
       LocalRegion owner) {
     @SuppressWarnings({"unchecked", "rawtypes"})
     RegionVersionVector rvv = new RegionVersionVector(ownerId, owner) {
@@ -615,4 +676,49 @@ public class RegionVersionVectorJUnitTest {
     assertEquals(0, rvv.getExceptionCount(id));
   }
 
+  private class TestableRegionVersionVector
+      extends RegionVersionVector<VersionSource<InternalDistributedMember>> {
+
+    TestableRegionVersionVector(VersionSource<InternalDistributedMember> 
ownerId, long version) {
+      super(ownerId, null, version);
+    }
+
+    @Override
+    protected RegionVersionVector createCopy(VersionSource ownerId, 
ConcurrentHashMap vector,
+        long version, ConcurrentHashMap gcVersions, long gcVersion, boolean 
singleMember,
+        RegionVersionHolder clonedLocalHolder) {
+      return null;
+    }
+
+    @Override
+    protected VersionSource<InternalDistributedMember> readMember(DataInput in)
+        throws IOException, ClassNotFoundException {
+      return null;
+    }
+
+    @Override
+    protected void writeMember(VersionSource member, DataOutput out) throws 
IOException {
+
+    }
+
+    @Override
+    public int getDSFID() {
+      return 0;
+    }
+  }
+
+  private class VersionRaceConditionRegionVersionVector extends 
TestableRegionVersionVector {
+
+    
VersionRaceConditionRegionVersionVector(VersionSource<InternalDistributedMember>
 ownerId,
+        long version) {
+      super(ownerId, version);
+    }
+
+    @Override
+    boolean compareAndSetVersion(long currentVersion, long newVersion) {
+      super.compareAndSetVersion(currentVersion, newVersion);
+      return false;
+    }
+
+  }
 }

-- 
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].

Reply via email to