[ 
https://issues.apache.org/jira/browse/GEODE-4285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16329696#comment-16329696
 ] 

ASF GitHub Bot commented on GEODE-4285:
---------------------------------------

upthewaterspout closed pull request #1273: GEODE-4285: Get a distributed lock 
if we can't find a PDX type
URL: https://github.com/apache/geode/pull/1273
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/geode-core/src/main/java/org/apache/geode/pdx/internal/PeerTypeRegistration.java
 
b/geode-core/src/main/java/org/apache/geode/pdx/internal/PeerTypeRegistration.java
index 0752066146..32e4c31ce9 100644
--- 
a/geode-core/src/main/java/org/apache/geode/pdx/internal/PeerTypeRegistration.java
+++ 
b/geode-core/src/main/java/org/apache/geode/pdx/internal/PeerTypeRegistration.java
@@ -446,13 +446,27 @@ private void checkDistributedTypeRegistryState() {
   }
 
   public PdxType getType(int typeId) {
+    return getById(typeId);
+  }
+
+  private <T> T getById(Object typeId) {
     verifyConfiguration();
     TXStateProxy currentState = suspendTX();
     try {
-      return (PdxType) getIdToType().get(typeId);
+      T pdxType = (T) getIdToType().get(typeId);
+      if (pdxType == null) {
+        lock();
+        try {
+          pdxType = (T) getIdToType().get(typeId);
+        } finally {
+          unlock();
+        }
+      }
+      return pdxType;
     } finally {
       resumeTX(currentState);
     }
+
   }
 
   public void addRemoteType(int typeId, PdxType type) {
@@ -466,7 +480,7 @@ public void addRemoteType(int typeId, PdxType type) {
         // the distributed lock.
         lock();
         try {
-          r.put(typeId, type);
+          r.putIfAbsent(typeId, type);
         } finally {
           unlock();
         }
@@ -703,14 +717,8 @@ public int defineEnum(EnumInfo newInfo) {
   }
 
   public EnumInfo getEnumById(int id) {
-    verifyConfiguration();
     EnumId enumId = new EnumId(id);
-    TXStateProxy currentState = suspendTX();
-    try {
-      return (EnumInfo) getIdToType().get(enumId);
-    } finally {
-      resumeTX(currentState);
-    }
+    return getById(enumId);
   }
 
   @Override
diff --git 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/PDXNewWanDUnitTest.java
 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/PDXNewWanDUnitTest.java
index abcd142cfa..be027c98e7 100644
--- 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/PDXNewWanDUnitTest.java
+++ 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/PDXNewWanDUnitTest.java
@@ -16,14 +16,31 @@
 
 import static org.junit.Assert.*;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.awaitility.Awaitility;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.partition.PartitionRegionHelper;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.distributed.internal.DistributionMessageObserver;
+import org.apache.geode.internal.cache.UpdateOperation;
 import org.apache.geode.internal.cache.wan.WANTestBase;
+import org.apache.geode.pdx.PdxClientServerDUnitTest;
+import org.apache.geode.pdx.PdxReader;
+import org.apache.geode.pdx.PdxSerializable;
+import org.apache.geode.pdx.PdxWriter;
+import org.apache.geode.pdx.internal.PeerTypeRegistration;
+import org.apache.geode.test.dunit.AsyncInvocation;
 import org.apache.geode.test.dunit.IgnoredException;
 import org.apache.geode.test.dunit.Wait;
 import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
@@ -35,6 +52,7 @@
 public class PDXNewWanDUnitTest extends WANTestBase {
 
   private static final long serialVersionUID = 1L;
+  public static final String KEY_0 = "Key_0";
 
   public PDXNewWanDUnitTest() {
     super();
@@ -496,6 +514,115 @@ public void testWANPDX_PR_ParallelSender() {
     vm2.invoke(() -> WANTestBase.validateRegionSize_PDX(getTestMethodName() + 
"_PR", 1));
   }
 
+  @Test
+  public void testWANPDX_PR_ParallelSender_WithDelayedTypeRegistry()
+      throws InterruptedException, ExecutionException {
+    Integer lnPort = (Integer) vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = (Integer) vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    // Create the receiver side of the WAN gateway. Only vm2 will be a 
receiver, vm3 is
+    // just a peer
+    createCacheInVMs(nyPort, vm2, vm3);
+    vm2.invoke(() -> WANTestBase.createReceiver());
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + 
"_PR", null, 0, 4,
+        isOffHeap()));
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + 
"_PR", null, 0, 4,
+        isOffHeap()));
+
+    AsyncInvocation deserializationFuture;
+    try {
+      // Delay processing of sending type registry update from vm2
+      vm2.invoke(() -> {
+        DistributionMessageObserver.setInstance(new 
BlockingPdxTypeUpdateObserver());
+      });
+
+      // Create the sender side of the WAN connection. 2 VMs, with paused 
senders
+      vm4.invoke(() -> WANTestBase.createCache(lnPort));
+      vm5.invoke(() -> WANTestBase.createCache(lnPort));
+
+      vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, 
false, null, false));
+      vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, 
false, null, false));
+
+      // Create the partitioned region in vm4
+      vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() 
+ "_PR", "ln", 0, 4,
+          isOffHeap()));
+
+      vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() 
+ "_PR", "ln", 0, 4,
+          isOffHeap()));
+
+      vm5.invoke(() -> {
+        Region region = cache.getRegion(getTestMethodName() + "_PR");
+        PartitionRegionHelper.assignBucketsToPartitions(region);
+      });
+
+      vm4.invoke(() -> WANTestBase.pauseSender("ln"));
+      vm5.invoke(() -> WANTestBase.pauseSender("ln"));
+
+      // Do some puts to fill up our queues
+      vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 20));
+
+      vm4.invoke(() -> {
+        final Region r = cache.getRegion(Region.SEPARATOR + 
getTestMethodName() + "_PR");
+        PdxValue result = (PdxValue) r.put(KEY_0, new PdxValue(0));
+      });
+
+      // Force VM4 to be the primary
+      vm4.invoke(() -> {
+        final Region region = cache.getRegion(Region.SEPARATOR + 
getTestMethodName() + "_PR");
+        DistributedMember primary = 
PartitionRegionHelper.getPrimaryMemberForKey(region, KEY_0);
+        // If we are not the primary
+        DistributedMember localMember = 
cache.getDistributedSystem().getDistributedMember();
+        if (!primary.equals(localMember)) {
+          PartitionRegionHelper.moveBucketByKey(region, primary, localMember, 
KEY_0);
+
+        }
+      });
+
+      vm5.invoke(() -> WANTestBase.resumeSender("ln"));
+
+      boolean blocking = vm2.invoke(() -> {
+        BlockingPdxTypeUpdateObserver observer =
+            (BlockingPdxTypeUpdateObserver) 
DistributionMessageObserver.getInstance();
+        return observer.startedBlocking.await(1, TimeUnit.MINUTES);
+      });
+
+      assertTrue(blocking);
+
+      vm4.invoke(() -> WANTestBase.resumeSender("ln"));
+
+      vm2.invoke(() -> {
+        final Region region = cache.getRegion(Region.SEPARATOR + 
getTestMethodName() + "_PR");
+        Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> 
region.containsKey(KEY_0));
+
+      });
+
+      // Make sure vm3 can deserialize the value
+      deserializationFuture = vm3.invokeAsync(() -> {
+        final Region r = cache.getRegion(Region.SEPARATOR + 
getTestMethodName() + "_PR");
+        PdxValue result = (PdxValue) r.get(KEY_0);
+        assertEquals(result, new PdxValue(0));
+      });
+
+      try {
+        deserializationFuture.await(10, TimeUnit.SECONDS);
+        fail("Get should have been blocked waiting for PDX type to be 
distributed");
+      } catch (TimeoutException e) {
+        // This is what we hope will happen. The get will be blocked by some 
sort of lock, rather
+        // than failing due to a missing type.
+      }
+
+    } finally {
+
+      vm2.invoke(() -> {
+        BlockingPdxTypeUpdateObserver observer =
+            (BlockingPdxTypeUpdateObserver) 
DistributionMessageObserver.getInstance();
+        observer.latch.countDown();
+      });
+    }
+
+    deserializationFuture.get();
+  }
+
   @Test
   public void testWANPDX_PR_ParallelSender_47826() {
     Integer lnPort = (Integer) vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
@@ -724,5 +851,67 @@ public static void verifyFilterInvocation(int invocation) {
   }
 
 
+  private static class BlockingPdxTypeUpdateObserver extends 
DistributionMessageObserver {
+    private CountDownLatch latch = new CountDownLatch(1);
+    private CountDownLatch startedBlocking = new CountDownLatch(1);
+
+    @Override
+    public void beforeSendMessage(ClusterDistributionManager dm, 
DistributionMessage message) {
+      if (message instanceof UpdateOperation.UpdateMessage
+          && ((UpdateOperation.UpdateMessage) message).getRegionPath()
+              .contains(PeerTypeRegistration.REGION_FULL_PATH)) {
+        startedBlocking.countDown();
+        try {
+          latch.await();
+        } catch (InterruptedException e) {
+          throw new RuntimeException("Interrupted", e);
+        }
+
+      }
+    }
+  }
+
+  public static class PdxValue implements PdxSerializable {
+    public int value;
+
+    public PdxValue() {
+
+    }
+
+    public PdxValue(int value) {
+      this.value = value;
+    }
+
+    @Override
+    public void toData(PdxWriter writer) {
+      writer.writeInt("value", value);
+
+    }
+
+    @Override
+    public void fromData(PdxReader reader) {
+      value = reader.readInt("value");
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      PdxValue pdxValue = (PdxValue) o;
+
+      return value == pdxValue.value;
+    }
+
+    @Override
+    public int hashCode() {
+      return value;
+    }
+  }
+
 
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Temporary failure with "Unable to determine PDXType" using WAN
> --------------------------------------------------------------
>
>                 Key: GEODE-4285
>                 URL: https://issues.apache.org/jira/browse/GEODE-4285
>             Project: Geode
>          Issue Type: Bug
>          Components: serialization
>            Reporter: Dan Smith
>            Priority: Major
>              Labels: pull-request-available
>
> We tracked down a race condition in distributing PDX types to the remote side 
> of a WAN site.
> When using a parallel sender, all primaries on the sending side are 
> dispatching the same PDX type in parallel.
> On the receiving side, the first gateway batch will get a distributed lock in 
> PeerTypeRegistration.addRemoteType
> {code}
> if (!r.containsKey(typeId)) {
>         // This type could actually be for this distributed system,
>         // so we need to make sure the type is published while holding
>         // the distributed lock.
>         lock();
>         try {
>           r.putIfAbsent(typeId, type);
>         } finally {
>           unlock();
>         }
>       }
> {code}
> However, the second gateway batch that is received will continue on without 
> getting the distributed lock because r.containsKey() will return true.
> The second batch could have values that require this type. But without 
> getting the lock, those fails will get to members that need the type 
> potentially before the first batch is finished distributing the type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to