This is an automated email from the ASF dual-hosted git repository.

jchen21 pushed a commit to branch support/1.14
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/support/1.14 by this push:
     new 6b85568  GEODE-8671: Two threads calling get and retrieve the same 
PdxInstance, resulting in corruption (#5925)
6b85568 is described below

commit 6b85568cc21678d5854fb89f2fdaecd2a22ccd2f
Author: Jianxia Chen <11181423+jche...@users.noreply.github.com>
AuthorDate: Wed Mar 3 09:19:36 2021 -0800

    GEODE-8671: Two threads calling get and retrieve the same PdxInstance, 
resulting in corruption (#5925)
    
    For PdxInstance, return a new reference in 
LocalRegion.optimizedGetObject(), instead of using the value in the Future. 
This is to avoid Pdx corruption when multiple threads share the same reference 
of PdxInstance.
    
    (cherry picked from commit dabb610b74bb0b27603d7803ec3cdd1cbb16c43f)
---
 .../cache/RegionConcurrentOperationDUnitTest.java  | 102 ++++++++++++++++-----
 .../apache/geode/internal/cache/LocalRegion.java   |  33 ++++---
 .../geode/internal/cache/LocalRegionTest.java      |  43 +++++++++
 3 files changed, 146 insertions(+), 32 deletions(-)

diff --git 
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/RegionConcurrentOperationDUnitTest.java
 
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/RegionConcurrentOperationDUnitTest.java
index d0ccd1d..cda199d 100755
--- 
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/RegionConcurrentOperationDUnitTest.java
+++ 
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/RegionConcurrentOperationDUnitTest.java
@@ -14,41 +14,42 @@
  */
 package org.apache.geode.internal.cache;
 
+import static org.apache.geode.cache.RegionShortcut.PARTITION;
 import static org.apache.geode.cache.RegionShortcut.REPLICATE;
 import static org.apache.geode.cache.RegionShortcut.REPLICATE_PROXY;
 import static org.apache.geode.test.dunit.VM.getVM;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.Serializable;
+import java.time.Duration;
+import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
-import org.junit.After;
 import org.junit.Rule;
 import org.junit.Test;
 
+import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.CacheLoader;
 import org.apache.geode.cache.DataPolicy;
 import org.apache.geode.cache.LoaderHelper;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.Scope;
-import org.apache.geode.test.dunit.DUnitBlackboard;
+import org.apache.geode.pdx.PdxReader;
+import org.apache.geode.pdx.PdxSerializable;
+import org.apache.geode.pdx.PdxWriter;
 import org.apache.geode.test.dunit.VM;
 import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.DistributedBlackboard;
 import org.apache.geode.test.dunit.rules.DistributedRule;
 import org.apache.geode.test.junit.rules.ExecutorServiceRule;
 
 public class RegionConcurrentOperationDUnitTest implements Serializable {
 
-  private static DUnitBlackboard blackboard;
-
   Object key = "KEY";
   String value = "VALUE";
 
-  private static DUnitBlackboard getBlackboard() {
-    if (blackboard == null) {
-      blackboard = new DUnitBlackboard();
-    }
+  private DistributedBlackboard getBlackboard() {
     return blackboard;
   }
 
@@ -61,10 +62,8 @@ public class RegionConcurrentOperationDUnitTest implements 
Serializable {
   @Rule
   public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule();
 
-  @After
-  public void tearDown() {
-    blackboard.initBlackboard();
-  }
+  @Rule
+  public DistributedBlackboard blackboard = new DistributedBlackboard();
 
   @Test
   public void getOnProxyRegionFromMultipleThreadsReturnsDifferentObjects() 
throws Exception {
@@ -80,13 +79,13 @@ public class RegionConcurrentOperationDUnitTest implements 
Serializable {
           .setCacheLoader(new TestCacheLoader()).create(regionName);
     });
 
-    Future get1 = executorServiceRule.submit(() -> {
-      Region region = cacheRule.getCache().getRegion(regionName);
+    Future<Object> get1 = executorServiceRule.submit(() -> {
+      Region<Object, Object> region = 
cacheRule.getCache().getRegion(regionName);
       return region.get(key);
     });
 
-    Future get2 = executorServiceRule.submit(() -> {
-      Region region = cacheRule.getCache().getRegion(regionName);
+    Future<Object> get2 = executorServiceRule.submit(() -> {
+      Region<Object, Object> region = 
cacheRule.getCache().getRegion(regionName);
       getBlackboard().waitForGate("Loader", 60, TimeUnit.SECONDS);
       return region.get(key);
     });
@@ -113,13 +112,13 @@ public class RegionConcurrentOperationDUnitTest 
implements Serializable {
     });
     assertThat(cacheRule.getCache().getRegion(regionName).size()).isEqualTo(0);
 
-    Future get1 = executorServiceRule.submit(() -> {
-      Region region = cacheRule.getCache().getRegion(regionName);
+    Future<Object> get1 = executorServiceRule.submit(() -> {
+      Region<Object, Object> region = 
cacheRule.getCache().getRegion(regionName);
       return region.get(key);
     });
 
-    Future get2 = executorServiceRule.submit(() -> {
-      Region region = cacheRule.getCache().getRegion(regionName);
+    Future<Object> get2 = executorServiceRule.submit(() -> {
+      Region<Object, Object> region = 
cacheRule.getCache().getRegion(regionName);
       getBlackboard().waitForGate("Loader", 60, TimeUnit.SECONDS);
       return region.get(key);
     });
@@ -131,7 +130,50 @@ public class RegionConcurrentOperationDUnitTest implements 
Serializable {
     assertThat(cacheRule.getCache().getRegion(regionName).size()).isEqualTo(1);
   }
 
-  private class TestCacheLoader implements CacheLoader, Serializable {
+  @Test
+  public void 
getOnPartitionedRegionFromMultipleThreadsReturnsDifferentPdxInstances()
+      throws Exception {
+    String regionName = getClass().getSimpleName();
+    CacheFactory cacheFactory = new CacheFactory();
+    cacheFactory.setPdxReadSerialized(true);
+    cacheRule.createCache(cacheFactory);
+    InternalCache cache = cacheRule.getCache();
+    cache.setCopyOnRead(true);
+    Region<Object, Object> region = cache.createRegionFactory(PARTITION)
+        .create(regionName);
+
+    // Keep doing this concurrency test for 30 seconds.
+    long endTime = Duration.ofSeconds(30).toMillis() + 
System.currentTimeMillis();
+
+    while (System.currentTimeMillis() < endTime) {
+      Callable<Object> getValue = () -> {
+        while (true) {
+          Object value = region.get(key);
+          if (value != null) {
+            return value;
+          }
+        }
+      };
+
+      // In this test, two threads are doing gets. One thread puts the value
+      // We expect that the threads will *always* get different PdxInstance 
values
+      Future<Object> get1 = executorServiceRule.submit(getValue);
+      Future<Object> get2 = executorServiceRule.submit(getValue);
+      Future<Object> put = executorServiceRule.submit(() -> region.put(key, 
new TestValue()));
+
+      Object get1value = get1.get();
+      Object get2value = get2.get();
+      put.get();
+
+      // Assert the values returned are different objects.
+      // PdxInstances are not threadsafe and should not be shared between 
threads.
+      assertThat(get1value).isNotSameAs(get2value);
+      region.destroy(key);
+
+    }
+  }
+
+  private class TestCacheLoader implements CacheLoader<Object, Object>, 
Serializable {
 
     @Override
     public synchronized Object load(LoaderHelper helper) {
@@ -142,4 +184,22 @@ public class RegionConcurrentOperationDUnitTest implements 
Serializable {
     @Override
     public void close() {}
   }
+
+  private static class TestValue implements PdxSerializable {
+    int field1 = 5;
+    String field2 = "field";
+
+    @Override
+    public void toData(PdxWriter writer) {
+      writer.writeInt("field1", field1);
+      writer.writeString("field2", field2);
+
+    }
+
+    @Override
+    public void fromData(PdxReader reader) {
+      reader.readInt("field1");
+      reader.readString("field2");
+    }
+  }
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index dbe3ced..cf76f07 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -1449,8 +1449,8 @@ public class LocalRegion extends AbstractRegion 
implements LoaderHelperFactory,
     return result;
   }
 
-
-  private Object getObject(KeyInfo keyInfo, boolean isCreate, boolean 
generateCallbacks,
+  @VisibleForTesting
+  Object getObject(KeyInfo keyInfo, boolean isCreate, boolean 
generateCallbacks,
       Object localValue, boolean disableCopyOnRead, boolean preferCD,
       ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent,
       boolean returnTombstones) {
@@ -1489,10 +1489,16 @@ public class LocalRegion extends AbstractRegion 
implements LoaderHelperFactory,
     return result;
   }
 
+  @VisibleForTesting
+  Map getGetFutures() {
+    return this.getFutures;
+  }
+
   /**
    * optimized to only allow one thread to do a search/load, other threads 
wait on a future
    */
-  private Object optimizedGetObject(KeyInfo keyInfo, boolean isCreate, boolean 
generateCallbacks,
+  @VisibleForTesting
+  Object optimizedGetObject(KeyInfo keyInfo, boolean isCreate, boolean 
generateCallbacks,
       Object localValue, boolean disableCopyOnRead, boolean preferCD,
       ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent,
       boolean returnTombstones) {
@@ -1505,9 +1511,7 @@ public class LocalRegion extends AbstractRegion 
implements LoaderHelperFactory,
         Object[] valueAndVersion = (Object[]) otherFuture.get();
         if (valueAndVersion != null) {
           result = valueAndVersion[0];
-          if (clientEvent != null) {
-            clientEvent.setVersionTag((VersionTag) valueAndVersion[1]);
-          }
+
           if (!preferCD && result instanceof CachedDeserializable) {
             CachedDeserializable cd = (CachedDeserializable) result;
             if (!disableCopyOnRead && (isCopyOnRead() || isProxy())) {
@@ -1519,12 +1523,19 @@ public class LocalRegion extends AbstractRegion 
implements LoaderHelperFactory,
           } else if (!disableCopyOnRead) {
             result = conditionalCopy(result);
           }
-          // what was a miss is now a hit
-          if (isCreate) {
-            RegionEntry regionEntry = basicGetEntry(keyInfo.getKey());
-            updateStatsForGet(regionEntry, true);
+          // GEODE-8671: for PdxInstance, we need a new reference of it. Don't 
use the value from
+          // the Future.
+          if (!(result instanceof PdxInstance)) {
+            if (clientEvent != null) {
+              clientEvent.setVersionTag((VersionTag) valueAndVersion[1]);
+            }
+            // what was a miss is now a hit
+            if (isCreate) {
+              RegionEntry regionEntry = basicGetEntry(keyInfo.getKey());
+              updateStatsForGet(regionEntry, true);
+            }
+            return result;
           }
-          return result;
         }
       } catch (InterruptedException ignore) {
         Thread.currentThread().interrupt();
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/LocalRegionTest.java 
b/geode-core/src/test/java/org/apache/geode/internal/cache/LocalRegionTest.java
index 44599be..4b0e72d 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/LocalRegionTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/LocalRegionTest.java
@@ -56,7 +56,11 @@ import 
org.apache.geode.internal.cache.AbstractRegion.PoolFinder;
 import org.apache.geode.internal.cache.LocalRegion.RegionMapConstructor;
 import 
org.apache.geode.internal.cache.LocalRegion.ServerRegionProxyConstructor;
 import org.apache.geode.internal.cache.control.InternalResourceManager;
+import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
 import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList;
+import org.apache.geode.internal.cache.versions.VersionTag;
+import org.apache.geode.internal.util.concurrent.FutureResult;
+import org.apache.geode.pdx.PdxInstance;
 
 public class LocalRegionTest {
   private EntryEventFactory entryEventFactory;
@@ -302,4 +306,43 @@ public class LocalRegionTest {
     verify(drs).incNumOverflowOnDisk(numOverflowOnDisk);
     verify(drs).incNumOverflowBytesOnDisk(numOverflowBytesOnDisk);
   }
+
+  @Test
+  public void forPdxInstanceByPassTheFutureInLocalRegionOptimizedGetObject() {
+    LocalRegion region =
+        spy(new LocalRegion("region", regionAttributes, null, cache, 
internalRegionArguments,
+            internalDataView, regionMapConstructor, 
serverRegionProxyConstructor, entryEventFactory,
+            poolFinder, regionPerfStatsFactory, disabledClock()));
+    KeyInfo keyInfo = mock(KeyInfo.class);
+    Object key = new Object();
+    Object result = new Object();
+    when(keyInfo.getKey()).thenReturn(key);
+    FutureResult thisFuture = new FutureResult(mock(CancelCriterion.class));
+    thisFuture.set(new Object[] {result, mock(VersionTag.class)});
+    region.getGetFutures().put(key, thisFuture);
+    // For non-PdxInstance, use the value in the Future
+    Object object = region.optimizedGetObject(keyInfo, true, true,
+        new Object(), true, true,
+        mock(ClientProxyMembershipID.class), mock(EntryEventImpl.class),
+        true);
+    assertThat(object).isSameAs(result);
+
+    // For PdxInstance, return a new reference by getObject(), bypassing the 
Future
+    result = mock(PdxInstance.class);
+    thisFuture.set(new Object[] {result, mock(VersionTag.class)});
+    Object newResult = new Object();
+    Object localValue = new Object();
+    ClientProxyMembershipID requestingClient = 
mock(ClientProxyMembershipID.class);
+    EntryEventImpl clientEvent = mock(EntryEventImpl.class);
+    when(region.getObject(keyInfo, true, true,
+        localValue, true, true,
+        requestingClient, clientEvent,
+        true)).thenReturn(newResult);
+    object = region.optimizedGetObject(keyInfo, true, true,
+        localValue, true, true,
+        requestingClient, clientEvent,
+        true);
+    assertThat(object).isNotSameAs(result);
+    assertThat(object).isSameAs(newResult);
+  }
 }

Reply via email to