This is an automated email from the ASF dual-hosted git repository.

boglesby pushed a commit to branch feature/GEODE-4451-2
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/GEODE-4451-2 by this 
push:
     new 841ef33  GEODE-4451: Prevented sender from being created when members 
aren't all current version
841ef33 is described below

commit 841ef33eaa7c63fd2d3b05251195e2df0c1f0122
Author: Barry Oglesby <[email protected]>
AuthorDate: Mon Mar 19 12:01:13 2018 -0700

    GEODE-4451: Prevented sender from being created when members aren't all 
current version
---
 .../cli/commands/CreateGatewaySenderCommand.java   |  13 +++
 .../management/internal/cli/i18n/CliStrings.java   |   2 +
 .../commands/CreateGatewaySenderCommandTest.java   |  23 ++++
 .../cache/wan/WANRollingUpgradeDUnitTest.java      | 121 +++++++++++++++++++--
 4 files changed, 149 insertions(+), 10 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommand.java
 
b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommand.java
index 8a86c09..b132852 100644
--- 
a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommand.java
+++ 
b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommand.java
@@ -23,6 +23,8 @@ import org.springframework.shell.core.annotation.CliOption;
 
 import org.apache.geode.cache.wan.GatewaySender.OrderPolicy;
 import org.apache.geode.distributed.DistributedMember;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.Version;
 import org.apache.geode.management.cli.CliMetaData;
 import org.apache.geode.management.cli.ConverterHint;
 import org.apache.geode.management.cli.Result;
@@ -116,6 +118,12 @@ public class CreateGatewaySenderCommand extends 
GfshCommand {
 
     Set<DistributedMember> membersToCreateGatewaySenderOn = 
getMembers(onGroups, onMember);
 
+    // Don't allow sender to be created if all members are not the current 
version.
+    if (!verifyAllCurrentVersion(membersToCreateGatewaySenderOn)) {
+      return ResultBuilder.createUserErrorResult(
+          
CliStrings.CREATE_GATEWAYSENDER__MSG__CAN_NOT_CREATE_DIFFERENT_VERSIONS);
+    }
+
     List<CliFunctionResult> gatewaySenderCreateResults =
         executeAndGetFunctionResult(GatewaySenderCreateFunction.INSTANCE, 
gatewaySenderFunctionArgs,
             membersToCreateGatewaySenderOn);
@@ -139,6 +147,11 @@ public class CreateGatewaySenderCommand extends 
GfshCommand {
     return result;
   }
 
+  private boolean verifyAllCurrentVersion(Set<DistributedMember> members) {
+    return members.stream().allMatch(
+        member -> ((InternalDistributedMember) 
member).getVersionObject().equals(Version.CURRENT));
+  }
+
   public static class Interceptor extends AbstractCliAroundInterceptor {
     @Override
     public Result preExecution(GfshParseResult parseResult) {
diff --git 
a/geode-core/src/main/java/org/apache/geode/management/internal/cli/i18n/CliStrings.java
 
b/geode-core/src/main/java/org/apache/geode/management/internal/cli/i18n/CliStrings.java
index fe8650c..4f2b0a8 100644
--- 
a/geode-core/src/main/java/org/apache/geode/management/internal/cli/i18n/CliStrings.java
+++ 
b/geode-core/src/main/java/org/apache/geode/management/internal/cli/i18n/CliStrings.java
@@ -2248,6 +2248,8 @@ public class CliStrings {
       "Could not instantiate class \"{0}\" specified for \"{1}\".";
   public static final String 
CREATE_GATEWAYSENDER__MSG__COULD_NOT_ACCESS_CLASS_0_SPECIFIED_FOR_1 =
       "Could not access class \"{0}\" specified for \"{1}\".";
+  public static final String 
CREATE_GATEWAYSENDER__MSG__CAN_NOT_CREATE_DIFFERENT_VERSIONS =
+      "Gateway Sender cannot be created until all members are the current 
version";
 
   /* stop gateway-receiver */
   public static final String START_GATEWAYSENDER = "start gateway-sender";
diff --git 
a/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommandTest.java
 
b/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommandTest.java
index f8a0ae3..d87c37c 100644
--- 
a/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommandTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommandTest.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.verify;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
@@ -33,9 +34,13 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.internal.ClusterConfigurationService;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.management.internal.cli.functions.CliFunctionResult;
+import org.apache.geode.management.internal.cli.i18n.CliStrings;
 import org.apache.geode.management.internal.configuration.domain.XmlEntity;
 import org.apache.geode.test.junit.categories.UnitTest;
 import org.apache.geode.test.junit.rules.GfshParserRule;
@@ -148,4 +153,22 @@ public class CreateGatewaySenderCommandTest {
         .hasNoFailToPersistError();
     verify(ccService, never()).deleteXmlEntity(any(), any());
   }
+
+  @Test
+  public void whenMembersAreDifferentVersions() {
+    // Create a set of mixed version members
+    Set<DistributedMember> members = new HashSet<>();
+    InternalDistributedMember currentVersionMember = 
mock(InternalDistributedMember.class);
+    doReturn(Version.CURRENT).when(currentVersionMember).getVersionObject();
+    InternalDistributedMember oldVersionMember = 
mock(InternalDistributedMember.class);
+    doReturn(Version.GEODE_140).when(oldVersionMember).getVersionObject();
+    members.add(currentVersionMember);
+    members.add(oldVersionMember);
+    doReturn(members).when(command).getMembers(any(), any());
+
+    // Verify executing the command fails
+    gfsh.executeAndAssertThat(command,
+        "create gateway-sender --id=1 
--remote-distributed-system-id=1").statusIsError()
+        
.containsOutput(CliStrings.CREATE_GATEWAYSENDER__MSG__CAN_NOT_CREATE_DIFFERENT_VERSIONS);
+  }
 }
diff --git 
a/geode-wan/src/test/java/org/apache/geode/cache/wan/WANRollingUpgradeDUnitTest.java
 
b/geode-wan/src/test/java/org/apache/geode/cache/wan/WANRollingUpgradeDUnitTest.java
index e0b7dfd..570c3b6 100644
--- 
a/geode-wan/src/test/java/org/apache/geode/cache/wan/WANRollingUpgradeDUnitTest.java
+++ 
b/geode-wan/src/test/java/org/apache/geode/cache/wan/WANRollingUpgradeDUnitTest.java
@@ -19,14 +19,13 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.logging.log4j.Logger;
 import org.awaitility.Awaitility;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -44,7 +43,6 @@ import org.apache.geode.cache.server.CacheServer;
 import org.apache.geode.cache.util.CacheListenerAdapter;
 import org.apache.geode.distributed.Locator;
 import org.apache.geode.distributed.internal.DistributionConfig;
-import org.apache.geode.distributed.internal.DistributionConfigImpl;
 import org.apache.geode.distributed.internal.InternalLocator;
 import org.apache.geode.internal.AvailablePort;
 import org.apache.geode.internal.AvailablePortHelper;
@@ -52,7 +50,8 @@ import 
org.apache.geode.internal.cache.wan.AbstractGatewaySender;
 import org.apache.geode.internal.cache.wan.parallel.BatchRemovalThreadHelper;
 import 
org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
 import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
-import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.management.internal.cli.i18n.CliStrings;
+import org.apache.geode.management.internal.cli.util.CommandStringBuilder;
 import org.apache.geode.test.dunit.DistributedTestUtils;
 import org.apache.geode.test.dunit.Host;
 import org.apache.geode.test.dunit.IgnoredException;
@@ -64,6 +63,7 @@ import org.apache.geode.test.dunit.standalone.VersionManager;
 import org.apache.geode.test.junit.categories.BackwardCompatibilityTest;
 import org.apache.geode.test.junit.categories.DistributedTest;
 import org.apache.geode.test.junit.categories.WanTest;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
 import 
org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory;
 
 @SuppressWarnings("ConstantConditions")
@@ -85,6 +85,9 @@ public class WANRollingUpgradeDUnitTest extends 
JUnit4CacheTestCase {
   // the old version of Geode we're testing against
   private String oldVersion;
 
+  @Rule
+  public transient GfshCommandRule gfsh = new GfshCommandRule();
+
   public WANRollingUpgradeDUnitTest(String version) {
     oldVersion = version;
   }
@@ -500,7 +503,7 @@ public class WANRollingUpgradeDUnitTest extends 
JUnit4CacheTestCase {
     VM site1Server2 = host.getVM(oldVersion, 2);
     VM site1Client = host.getVM(oldVersion, 3);
 
-    // Get old site members
+    // Get current site members
     VM site2Locator = host.getVM(VersionManager.CURRENT_VERSION, 4);
     VM site2Server1 = host.getVM(VersionManager.CURRENT_VERSION, 5);
     VM site2Server2 = host.getVM(VersionManager.CURRENT_VERSION, 6);
@@ -512,7 +515,7 @@ public class WANRollingUpgradeDUnitTest extends 
JUnit4CacheTestCase {
     final String site1Locators = hostName + "[" + site1LocatorPort + "]";
     final int site1DistributedSystemId = 0;
 
-    // Get old site locator properties
+    // Get current site locator properties
     final int site2LocatorPort = 
AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
     DistributedTestUtils.deleteLocatorStateFile(site2LocatorPort);
     final String site2Locators = hostName + "[" + site2LocatorPort + "]";
@@ -530,7 +533,7 @@ public class WANRollingUpgradeDUnitTest extends 
JUnit4CacheTestCase {
                 
!InternalLocator.getLocator().getConfig().getEnableClusterConfiguration()
                     || 
InternalLocator.getLocator().isSharedConfigurationRunning())));
 
-    // Start old site locator
+    // Start current site locator
     site2Locator.invoke(() -> startLocator(site2LocatorPort, 
site2DistributedSystemId,
         site2Locators, site1Locators));
 
@@ -548,12 +551,12 @@ public class WANRollingUpgradeDUnitTest extends 
JUnit4CacheTestCase {
     rollStartAndConfigureServerToCurrent(site1Server2, site1Locators, 
site2DistributedSystemId,
         regionName, site1SenderId, 
ParallelGatewaySenderQueue.DEFAULT_MESSAGE_SYNC_INTERVAL);
 
-    // Start and configure old site servers
+    // Start and configure old current servers
     String site2SenderId = getName() + "_gatewaysender_" + 
site1DistributedSystemId;
     startAndConfigureServers(site2Server1, site2Server2, site2Locators, 
site1DistributedSystemId,
         regionName, site2SenderId, 
ParallelGatewaySenderQueue.DEFAULT_MESSAGE_SYNC_INTERVAL);
 
-    // Do puts from mixed site client and verify events on old site
+    // Do puts from mixed site client and verify events on current site
     int numPuts = 100;
     doClientPutsAndVerifyEvents(site1Client, site1Server1, site1Server2, 
site2Server1, site2Server2,
         hostName, site1LocatorPort, regionName, numPuts, site1SenderId, false);
@@ -561,6 +564,23 @@ public class WANRollingUpgradeDUnitTest extends 
JUnit4CacheTestCase {
 
   private void startLocator(int port, int distributedSystemId, String locators,
       String remoteLocators) throws IOException {
+    Properties props = getLocatorProperties(distributedSystemId, locators, 
remoteLocators);
+    Locator.startLocatorAndDS(port, null, props);
+  }
+
+  private int startLocatorWithJmxManager(int port, int distributedSystemId, 
String locators,
+      String remoteLocators) throws IOException {
+    Properties props = getLocatorProperties(distributedSystemId, locators, 
remoteLocators);
+    int jmxPort = AvailablePortHelper.getRandomAvailableTCPPort();
+    props.put(DistributionConfig.JMX_MANAGER_PORT_NAME, 
String.valueOf(jmxPort));
+    props.put(DistributionConfig.JMX_MANAGER_NAME, "true");
+    props.put(DistributionConfig.JMX_MANAGER_START_NAME, "true");
+    Locator.startLocatorAndDS(port, null, props);
+    return jmxPort;
+  }
+
+  private Properties getLocatorProperties(int distributedSystemId, String 
locators,
+      String remoteLocators) {
     Properties props = new Properties();
     props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
     props.setProperty(DistributionConfig.DISTRIBUTED_SYSTEM_ID_NAME,
@@ -569,7 +589,7 @@ public class WANRollingUpgradeDUnitTest extends 
JUnit4CacheTestCase {
     props.setProperty(DistributionConfig.REMOTE_LOCATORS_NAME, remoteLocators);
     props.setProperty(DistributionConfig.LOG_LEVEL_NAME, 
DUnitLauncher.logLevel);
     props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, 
"false");
-    Locator.startLocatorAndDS(port, null, props);
+    return props;
   }
 
   private void stopLocator() throws Exception {
@@ -707,6 +727,87 @@ public class WANRollingUpgradeDUnitTest extends 
JUnit4CacheTestCase {
     }
   }
 
+  @Test
+  public void testCreateGatewaySenderMixedSiteOneCurrentSiteTwo() throws 
Exception {
+    final Host host = Host.getHost(0);
+
+    // Get mixed site members
+    VM site1Locator = host.getVM(oldVersion, 0);
+    VM site1Server1 = host.getVM(oldVersion, 1);
+    VM site1Server2 = host.getVM(oldVersion, 2);
+
+    // Get current site members
+    VM site2Locator = host.getVM(VersionManager.CURRENT_VERSION, 4);
+    VM site2Server1 = host.getVM(VersionManager.CURRENT_VERSION, 5);
+    VM site2Server2 = host.getVM(VersionManager.CURRENT_VERSION, 6);
+
+    // Get mixed site locator properties
+    String hostName = NetworkUtils.getServerHostName(host);
+    final int site1LocatorPort = 
AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+    DistributedTestUtils.deleteLocatorStateFile(site1LocatorPort);
+    final String site1Locators = hostName + "[" + site1LocatorPort + "]";
+    final int site1DistributedSystemId = 0;
+
+    // Get current site locator properties
+    final int site2LocatorPort = 
AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+    DistributedTestUtils.deleteLocatorStateFile(site2LocatorPort);
+    final String site2Locators = hostName + "[" + site2LocatorPort + "]";
+    final int site2DistributedSystemId = 1;
+
+    // Start mixed site locator
+    site1Locator.invoke(() -> startLocator(site1LocatorPort, 
site1DistributedSystemId,
+        site1Locators, site2Locators));
+
+    // Locators before 1.4 handled configuration asynchronously.
+    // We must wait for configuration configuration to be ready, or confirm 
that it is disabled.
+    site1Locator.invoke(
+        () -> Awaitility.await().atMost(65, TimeUnit.SECONDS).pollInterval(1, 
TimeUnit.SECONDS)
+            .until(() -> assertTrue(
+                
!InternalLocator.getLocator().getConfig().getEnableClusterConfiguration()
+                    || 
InternalLocator.getLocator().isSharedConfigurationRunning())));
+
+    // Start current site locator
+    site2Locator.invoke(() -> startLocator(site2LocatorPort, 
site2DistributedSystemId,
+        site2Locators, site1Locators));
+
+    // Start current site servers with receivers
+    site2Server1.invoke(() -> createCache(site2Locators));
+    site2Server1.invoke(() -> createGatewayReceiver());
+    site2Server2.invoke(() -> createCache(site2Locators));
+    site2Server2.invoke(() -> createGatewayReceiver());
+
+    // Start mixed site servers
+    site1Server1.invoke(() -> createCache(site1Locators));
+    site1Server2.invoke(() -> createCache(site1Locators));
+
+    // Roll mixed site locator to current with jmx manager
+    site1Locator.invoke(() -> stopLocator());
+    VM site1RolledLocator = host.getVM(VersionManager.CURRENT_VERSION, 
site1Locator.getId());
+    int jmxManagerPort =
+        site1RolledLocator.invoke(() -> 
startLocatorWithJmxManager(site1LocatorPort,
+            site1DistributedSystemId, site1Locators, site2Locators));
+
+    // Roll one mixed site server to current
+    site1Server2.invoke(() -> closeCache());
+    VM site1Server2RolledServer = host.getVM(VersionManager.CURRENT_VERSION, 
site1Server2.getId());
+    site1Server2RolledServer.invoke(() -> createCache(site1Locators));
+
+    // Use gfsh to attempt to create a gateway sender in the mixed site servers
+    this.gfsh.connectAndVerify(jmxManagerPort, 
GfshCommandRule.PortType.jmxManager);
+    this.gfsh
+        .executeAndAssertThat(getCreateGatewaySenderCommand("toSite2", 
site2DistributedSystemId))
+        .statusIsError()
+        
.containsOutput(CliStrings.CREATE_GATEWAYSENDER__MSG__CAN_NOT_CREATE_DIFFERENT_VERSIONS);
+  }
+
+  private String getCreateGatewaySenderCommand(String id, int remoteDsId) {
+    CommandStringBuilder csb = new 
CommandStringBuilder(CliStrings.CREATE_GATEWAYSENDER);
+    csb.addOption(CliStrings.CREATE_GATEWAYSENDER__ID, id);
+    csb.addOption(CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID,
+        String.valueOf(remoteDsId));
+    return csb.toString();
+  }
+
   private void createCache(String locators) {
     Properties props = new Properties();
     props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to