This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch feature/GEODE-7038
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 23727962252d5cbdf5dd7cccbd91ab65eca3ab19
Author: Bruce Schuchardt <[email protected]>
AuthorDate: Wed Jul 31 14:05:03 2019 -0700

    GEODE-7038: After auto-reconnect a server's multicat communications aren't 
working correctly
    
    Ensure that a JoinResponseMessage is sent if multicast is enabled.  This
    allows JGroupsMessenger to piggy-back a multicast message digest on the
    response that the new process can install in its JGroups stack to ensure
    that multicast messaging is properly initialized.
    
    I've also replaced complex checks for whether UDP security is enabled
    with a simpler check on ServiceConfig.  When UDP security is enabled we
    are already sending a JoinResponseMessage and so we don't need to send
    another one if multicast is enabled.
---
 .../DistributedMulticastRegionDUnitTest.java       | 94 ++++++++++++++++++----
 .../internal/membership/gms/ServiceConfig.java     |  4 +
 .../membership/gms/membership/GMSJoinLeave.java    | 24 ++++--
 .../gms/messages/JoinRequestMessage.java           |  9 +++
 .../gms/messages/JoinResponseMessage.java          |  1 -
 .../membership/gms/messenger/JGroupsMessenger.java |  4 +-
 6 files changed, 114 insertions(+), 22 deletions(-)

diff --git 
a/geode-core/src/distributedTest/java/org/apache/geode/cache30/DistributedMulticastRegionDUnitTest.java
 
b/geode-core/src/distributedTest/java/org/apache/geode/cache30/DistributedMulticastRegionDUnitTest.java
index 22561c0..9cfaa47 100755
--- 
a/geode-core/src/distributedTest/java/org/apache/geode/cache30/DistributedMulticastRegionDUnitTest.java
+++ 
b/geode-core/src/distributedTest/java/org/apache/geode/cache30/DistributedMulticastRegionDUnitTest.java
@@ -14,21 +14,26 @@
  */
 package org.apache.geode.cache30;
 
+import static 
org.apache.geode.distributed.ConfigurationProperties.DISABLE_AUTO_RECONNECT;
 import static 
org.apache.geode.distributed.ConfigurationProperties.ENABLE_TIME_STATISTICS;
 import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
 import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
+import static 
org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT;
 import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
 import static org.apache.geode.distributed.ConfigurationProperties.MCAST_TTL;
 import static org.apache.geode.distributed.ConfigurationProperties.NAME;
 import static 
org.apache.geode.distributed.ConfigurationProperties.STATISTIC_ARCHIVE_FILE;
 import static 
org.apache.geode.distributed.ConfigurationProperties.STATISTIC_SAMPLING_ENABLED;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.File;
 import java.io.IOException;
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 
+import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.geode.cache.AttributesFactory;
@@ -40,11 +45,15 @@ import org.apache.geode.cache.Scope;
 import org.apache.geode.distributed.Locator;
 import org.apache.geode.distributed.internal.InternalLocator;
 import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.internal.OSProcess;
 import org.apache.geode.internal.cache.CachedDeserializableFactory;
+import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.pdx.PdxReader;
 import org.apache.geode.pdx.PdxSerializable;
 import org.apache.geode.pdx.PdxSerializationException;
 import org.apache.geode.pdx.PdxWriter;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.DistributedTestUtils;
 import org.apache.geode.test.dunit.Host;
 import org.apache.geode.test.dunit.Invoke;
 import org.apache.geode.test.dunit.SerializableCallable;
@@ -55,12 +64,17 @@ import 
org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
 
 public class DistributedMulticastRegionDUnitTest extends JUnit4CacheTestCase {
 
-  static int locatorVM = 3;
-  static String mcastport = "42786";
-  static String mcastttl = "0";
+  int locatorVM = 3;
+  String mcastport = "0";
+  String mcastttl = "0";
 
   private int locatorPort;
 
+  @Before
+  public void setup() {
+    mcastport = 
String.valueOf(AvailablePortHelper.getRandomAvailableUDPPort());
+  }
+
   @Override
   public final void preSetUp() throws Exception {
     clean();
@@ -111,24 +125,74 @@ public class DistributedMulticastRegionDUnitTest extends 
JUnit4CacheTestCase {
       @Override
       public void run2() throws CacheException {
         final Region region = getRootRegion().getSubregion(name);
-        for (int i = 0; i < 5; i++) {
+        for (int i = 0; i < 50; i++) {
           region.put(i, i);
         }
       }
     };
 
     vm0.invoke(doPuts);
+    vm0.invoke(() -> validateMulticastOpsAfterRegionOps());
+    vm1.invoke(() -> validateMulticastOpsAfterRegionOps());
 
-    SerializableRunnable validateMulticastAfterRegionOps =
-        new CacheSerializableRunnable("validateMulticast after region ops") {
-          @Override
-          public void run2() throws CacheException {
-            validateMulticastOpsAfterRegionOps();
-          }
-        };
+    closeLocator();
+  }
+
+  @Test
+  public void testMulticastAfterReconnect() {
+    final String name = "mcastRegion";
+    SerializableRunnable create = new CacheSerializableRunnable("Create 
Region") {
+      @Override
+      public void run2() throws CacheException {
+        createRegion(name, getRegionAttributes());
+      }
+    };
+
+    locatorPort = startLocator();
+    Host host = Host.getHost(0);
+    final VM vm0 = host.getVM(0);
+    final VM vm1 = host.getVM(1);
+    // 1. start locator with mcast port
+    vm0.invoke(create);
+    vm1.invoke(create);
+    // There is possibility that you may get this packet from other tests
+    /*
+     * SerializableRunnable validateMulticastBeforeRegionOps = new
+     * CacheSerializableRunnable("validateMulticast before region ops") { 
public void run2() throws
+     * CacheException { validateMulticastOpsBeforeRegionOps(); } };
+     *
+     * vm0.invoke(validateMulticastBeforeRegionOps); 
vm1.invoke(validateMulticastBeforeRegionOps);
+     */
+
+    SerializableRunnable doPuts = new CacheSerializableRunnable("do put") {
+      @Override
+      public void run2() throws CacheException {
+        final Region region = getRootRegion().getSubregion(name);
+        for (int i = 0; i < 50; i++) {
+          region.put(i, i);
+        }
+      }
+    };
+
+    vm0.invoke(doPuts);
+
+    DistributedTestUtils.crashDistributedSystem(vm1);
+    vm0.invoke(doPuts);
+    vm1.invoke(() -> {
+      basicGetCache().waitUntilReconnected(30, TimeUnit.SECONDS);
+      assertNotNull(basicGetCache().getReconnectedCache());
+      cache = (InternalCache) basicGetCache().getReconnectedCache();
+      system = cache.getInternalDistributedSystem();
+    });
+    vm0.invoke(doPuts);
+    vm0.invoke(() -> {
+      GeodeAwaitility.await().until(() -> {
+        getCache().close();
+        return true;
+      });
+    });
 
-    vm0.invoke(validateMulticastAfterRegionOps);
-    vm1.invoke(validateMulticastAfterRegionOps);
+    vm1.invoke(() -> validateMulticastOpsAfterRegionOps());
 
     closeLocator();
   }
@@ -220,8 +284,10 @@ public class DistributedMulticastRegionDUnitTest extends 
JUnit4CacheTestCase {
   @Override
   public Properties getDistributedSystemProperties() {
     Properties p = new Properties();
+    p.put(DISABLE_AUTO_RECONNECT, "false");
+    p.put(MAX_WAIT_TIME_RECONNECT, "20");
     p.put(STATISTIC_SAMPLING_ENABLED, "true");
-    p.put(STATISTIC_ARCHIVE_FILE, "multicast");
+    p.put(STATISTIC_ARCHIVE_FILE, "multicast" + OSProcess.getId());
     p.put(ENABLE_TIME_STATISTICS, "true");
     p.put(MCAST_PORT, mcastport);
     p.put(MCAST_TTL, mcastttl);
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/ServiceConfig.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/ServiceConfig.java
index 0fe55f0..b70e48a 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/ServiceConfig.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/ServiceConfig.java
@@ -85,6 +85,10 @@ public class ServiceConfig {
     return networkPartitionDetectionEnabled;
   }
 
+  public boolean isUDPSecurityEnabled() {
+    return !dconfig.getSecurityUDPDHAlgo().isEmpty();
+  }
+
   public boolean areLocatorsPreferredAsCoordinators() {
     boolean locatorsAreCoordinators;
 
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeave.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeave.java
index 32452c3..f38b20e 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeave.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeave.java
@@ -732,13 +732,14 @@ public class GMSJoinLeave implements JoinLeave {
       synchronized (viewRequests) {
         if (request instanceof JoinRequestMessage) {
           if (isCoordinator
-              && 
!services.getConfig().getDistributionConfig().getSecurityUDPDHAlgo().isEmpty()) 
{
+              && services.getConfig().isUDPSecurityEnabled()) {
             services.getMessenger().initClusterKey();
             JoinRequestMessage jreq = (JoinRequestMessage) request;
             // this will inform about cluster-secret key, as we have 
authenticated at this point
             JoinResponseMessage response = new 
JoinResponseMessage(jreq.getSender(),
                 services.getMessenger().getClusterSecretKey(), 
jreq.getRequestId());
             services.getMessenger().send(response);
+            jreq.setResponseSent();
           }
         }
         logger.debug("Recording the request to be processed in the next 
membership view");
@@ -753,17 +754,20 @@ public class GMSJoinLeave implements JoinLeave {
 
   private void sendDHKeys() {
     if (isCoordinator
-        && 
!services.getConfig().getDistributionConfig().getSecurityUDPDHAlgo().isEmpty()) 
{
+        && services.getConfig().isUDPSecurityEnabled()) {
       synchronized (viewRequests) {
         for (DistributionMessage request : viewRequests) {
           if (request instanceof JoinRequestMessage) {
 
             services.getMessenger().initClusterKey();
             JoinRequestMessage jreq = (JoinRequestMessage) request;
-            // this will inform about cluster-secret key, as we have 
authenticated at this point
-            JoinResponseMessage response = new 
JoinResponseMessage(jreq.getSender(),
-                services.getMessenger().getClusterSecretKey(), 
jreq.getRequestId());
-            services.getMessenger().send(response);
+            if (!jreq.isResponseSent()) {
+              // this will inform about cluster-secret key, as we have 
authenticated at this point
+              JoinResponseMessage response = new 
JoinResponseMessage(jreq.getSender(),
+                  services.getMessenger().getClusterSecretKey(), 
jreq.getRequestId());
+              services.getMessenger().send(response);
+              jreq.setResponseSent();
+            }
           }
         }
       }
@@ -2459,6 +2463,14 @@ public class GMSJoinLeave implements JoinLeave {
               } else {
                 joinReqs.add(mbr);
                 joinPorts.put(mbr, port);
+                if (!jmsg.isResponseSent()
+                    && services.getConfig().getTransport().isMcastEnabled()) {
+                  // send a join response so the new member can get the 
multicast messaging digest.
+                  JoinResponseMessage response = new 
JoinResponseMessage(jmsg.getSender(),
+                      services.getMessenger().getClusterSecretKey(), 
jmsg.getRequestId());
+                  services.getMessenger().send(response);
+                  jmsg.setResponseSent();
+                }
               }
             }
             break;
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messages/JoinRequestMessage.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messages/JoinRequestMessage.java
index 1c565d5..922d0ba 100755
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messages/JoinRequestMessage.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messages/JoinRequestMessage.java
@@ -30,6 +30,7 @@ public class JoinRequestMessage extends 
HighPriorityDistributionMessage {
   private Object credentials;
   private int failureDetectionPort = -1;
   private int requestId;
+  private transient boolean responseSent;
 
   public JoinRequestMessage(InternalDistributedMember coord, 
InternalDistributedMember id,
       Object credentials, int fdPort, int requestId) {
@@ -133,4 +134,12 @@ public class JoinRequestMessage extends 
HighPriorityDistributionMessage {
       return false;
     return true;
   }
+
+  public void setResponseSent() {
+    responseSent = true;
+  }
+
+  public boolean isResponseSent() {
+    return responseSent;
+  }
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messages/JoinResponseMessage.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messages/JoinResponseMessage.java
index b97d11a..e105d6a 100755
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messages/JoinResponseMessage.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messages/JoinResponseMessage.java
@@ -174,5 +174,4 @@ public class JoinResponseMessage extends 
HighPriorityDistributionMessage {
     return true;
   }
 
-
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
index c0561f3..7ed141d 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
@@ -412,7 +412,7 @@ public class JGroupsMessenger implements Messenger {
 
   @Override
   public void started() {
-    if (queuedMessagesFromReconnect != null) {
+    if (queuedMessagesFromReconnect != null && 
!services.getConfig().isUDPSecurityEnabled()) {
       logger.info("Delivering {} messages queued by quorum checker",
           queuedMessagesFromReconnect.size());
       for (Message message : queuedMessagesFromReconnect) {
@@ -1196,6 +1196,8 @@ public class JGroupsMessenger implements Messenger {
             this.myChannel.getProtocolStack().getTopProtocol()
                 .down(new Event(Event.MERGE_DIGEST, digest));
             jrsp.setMessengerData(null);
+            digest = (Digest) 
this.myChannel.getProtocolStack().getTopProtocol()
+                .down(Event.GET_DIGEST_EVT);
           } catch (Exception e) {
             logger.fatal("Unable to read JGroups messaging digest", e);
           }

Reply via email to