This is an automated email from the ASF dual-hosted git repository.
mivanac pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new c4e5a034d8 GEODE-10421: Improve start gw sender with clean-queue
(#7856)
c4e5a034d8 is described below
commit c4e5a034d8cccb0a2814221d0cd3a8c5242e913d
Author: Mario Ivanac <[email protected]>
AuthorDate: Fri Sep 16 10:40:20 2022 +0200
GEODE-10421: Improve start gw sender with clean-queue (#7856)
* GEODE-10421: added check gw status
* GEODE-10421: added TC
* GEODE-10421: add document impacts
* GEODE-10421: update after comments
---
.../geode/management/internal/i18n/CliStrings.java | 7 ++
.../gfsh/command-pages/start.html.md.erb | 2 +-
.../cli/commands/StartGatewaySenderCommand.java | 38 +++++++++
.../StartGatewaySenderCommandDUnitTest.java | 91 ++++++++++++++++++++++
4 files changed, 137 insertions(+), 1 deletion(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/management/internal/i18n/CliStrings.java
b/geode-core/src/main/java/org/apache/geode/management/internal/i18n/CliStrings.java
index f533c77960..3734eedce4 100644
---
a/geode-core/src/main/java/org/apache/geode/management/internal/i18n/CliStrings.java
+++
b/geode-core/src/main/java/org/apache/geode/management/internal/i18n/CliStrings.java
@@ -3007,6 +3007,13 @@ public class CliStrings {
public static final String GATEWAY_RECEIVER_IS_NOT_AVAILABLE_ON_MEMBER_0 =
"GatewayReceiver is not available on member {0}";
+ public static final String START_GATEWAYSENDER_REJECTED = "Command rejected.
Reasons:";
+
+ public static final String REJECT_START_GATEWAYSENDER_REASON = "Reasons
command is rejected";
+
+ public static final String EXECUTE_ON_ALL_GATEWAYSENDER_MEMBERS =
+ "Command must be executed on all members on which gateway sender is
created";
+
public static final String GATEWAY_SENDER_IS_NOT_AVAILABLE = "GatewaySender
is not available";
public static final String GATEWAY_SENDER_0_IS_ALREADY_STARTED_ON_MEMBER_1 =
"GatewaySender {0} is already started on member {1}";
diff --git a/geode-docs/tools_modules/gfsh/command-pages/start.html.md.erb
b/geode-docs/tools_modules/gfsh/command-pages/start.html.md.erb
index f25b6b5be0..b41bfb02bb 100644
--- a/geode-docs/tools_modules/gfsh/command-pages/start.html.md.erb
+++ b/geode-docs/tools_modules/gfsh/command-pages/start.html.md.erb
@@ -121,7 +121,7 @@ start gateway-sender --id=value [--groups=value(,value)*]
[--members=value(,valu
| ‑‑id | *Required.* ID of the GatewaySender.
| |
| ‑‑groups | Group(s) of members on which to start the
Gateway Sender. | |
| ‑‑members | Member(s) on which to start the Gateway Sender.
| |
-| ‑‑clean-queues | Option to clean existing queue at start of the
Gateway Sender. This option is only applicable for Gateway Senders with enabled
persistence. | false |
+| ‑‑clean-queues | Option to clean existing queue at start of the
Gateway Sender. This option can be executed only on all members on which
Gateway Sender is created. | false |
**Example Commands:**
diff --git
a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/StartGatewaySenderCommand.java
b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/StartGatewaySenderCommand.java
index 996e85be8e..d02277d4ac 100644
---
a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/StartGatewaySenderCommand.java
+++
b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/StartGatewaySenderCommand.java
@@ -77,6 +77,44 @@ public class StartGatewaySenderCommand extends GfshCommand {
return ResultModel.createError(CliStrings.NO_MEMBERS_FOUND_MESSAGE);
}
+ if (cleanQueues) {
+
+ GatewaySenderMXBean bean;
+ boolean commandRejected = false;
+
+ ResultModel rejectResultModel =
+ ResultModel.createError(CliStrings.START_GATEWAYSENDER_REJECTED);
+ TabularResultModel rejectResultData =
+
rejectResultModel.addTable(CliStrings.REJECT_START_GATEWAYSENDER_REASON);
+
+ Set<DistributedMember> allServers = findMembers(null, null);
+
+ for (DistributedMember member : allServers) {
+ if
(cache.getDistributedSystem().getDistributedMember().getId().equals(member.getId()))
{
+ bean = service.getLocalGatewaySenderMXBean(senderId);
+ } else {
+ ObjectName objectName = service.getGatewaySenderMBeanName(member,
senderId);
+ bean = service.getMBeanProxy(objectName, GatewaySenderMXBean.class);
+ }
+ if (bean != null) {
+ if (!dsMembers.contains(member)) {
+ return
ResultModel.createError(CliStrings.EXECUTE_ON_ALL_GATEWAYSENDER_MEMBERS);
+ }
+
+ if (bean.isRunning()) {
+ commandRejected = true;
+ rejectResultData.addMemberStatusResultRow(member.getId(),
CliStrings.GATEWAY_ERROR,
+ CliStrings.format(
+
CliStrings.GATEWAY_SENDER_0_IS_ALREADY_STARTED_ON_MEMBER_1, id,
+ member.getId()));
+ }
+ }
+ }
+ if (commandRejected) {
+ return rejectResultModel;
+ }
+ }
+
ExecutorService execService =
LoggingExecutors.newCachedThreadPool("Start Sender Command Thread ",
true);
diff --git
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/StartGatewaySenderCommandDUnitTest.java
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/StartGatewaySenderCommandDUnitTest.java
index edff65d582..b41ac9f916 100644
---
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/StartGatewaySenderCommandDUnitTest.java
+++
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/StartGatewaySenderCommandDUnitTest.java
@@ -19,6 +19,7 @@ import static
org.apache.geode.distributed.ConfigurationProperties.GROUPS;
import static
org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
import static
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.createSender;
import static
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.getMember;
+import static
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.startSender;
import static
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewaySenderMXBeanProxy;
import static
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifySenderState;
import static org.assertj.core.api.Assertions.assertThat;
@@ -431,6 +432,96 @@ public class StartGatewaySenderCommandDUnitTest implements
Serializable {
server3.invoke(() -> verifySenderState("ln", true, false));
}
+ @Test
+ public void
testStartGatewaySender_clean_queues_sender_on_one_server_allready_started()
+ throws Exception {
+ Integer locator1Port = locatorSite1.getPort();
+
+ // setup servers in Site #1
+ server1 = clusterStartupRule.startServerVM(3, locator1Port);
+ server2 = clusterStartupRule.startServerVM(4, locator1Port);
+ server3 = clusterStartupRule.startServerVM(5, locator1Port);
+
+ server1.invoke(() -> createSender("ln", 2, false, 100, 400, false, false,
null, true));
+ server2.invoke(() -> createSender("ln", 2, false, 100, 400, false, false,
null, true));
+ server3.invoke(() -> createSender("ln", 2, false, 100, 400, false, false,
null, true));
+
+ server1.invoke(() -> startSender("ln"));
+
+ server1.invoke(() -> verifySenderState("ln", true, false));
+ server2.invoke(() -> verifySenderState("ln", false, false));
+ server3.invoke(() -> verifySenderState("ln", false, false));
+
+ locatorSite1.invoke(
+ () -> validateGatewaySenderMXBeanProxy(getMember(server1.getVM()),
"ln", true, false));
+ locatorSite1.invoke(
+ () -> validateGatewaySenderMXBeanProxy(getMember(server2.getVM()),
"ln", false, false));
+ locatorSite1.invoke(
+ () -> validateGatewaySenderMXBeanProxy(getMember(server3.getVM()),
"ln", false, false));
+
+ String command =
+ CliStrings.START_GATEWAYSENDER + " --" +
CliStrings.START_GATEWAYSENDER__ID + "=ln --"
+ + CliStrings.START_GATEWAYSENDER__CLEAN_QUEUE + "=true";
+ CommandResult cmdResult = executeCommandWithIgnoredExceptions(command);
+ assertThat(cmdResult).isNotNull();
+ assertThat(cmdResult.getStatus()).isSameAs(Result.Status.ERROR);
+
+ TabularResultModel resultData = cmdResult.getResultData()
+ .getTableSection(CliStrings.REJECT_START_GATEWAYSENDER_REASON);
+ List<String> status = resultData.getValuesInColumn("Result");
+ assertThat(status).containsExactlyInAnyOrder("Error");
+
+ }
+
+
+ @Test
+ public void testStartGatewaySender_clean_queues_on_one_member() throws
Exception {
+ Integer locator1Port = locatorSite1.getPort();
+
+ // setup servers in Site #1
+ server1 = clusterStartupRule.startServerVM(3, locator1Port);
+ server2 = clusterStartupRule.startServerVM(4, locator1Port);
+ server3 = clusterStartupRule.startServerVM(5, locator1Port);
+
+ DistributedMember vm1Member = getMember(server1.getVM());
+
+ server1.invoke(() -> createSender("ln", 2, false, 100, 400, false, false,
null, true));
+ server2.invoke(() -> createSender("ln", 2, false, 100, 400, false, false,
null, true));
+ server3.invoke(() -> createSender("ln", 2, false, 100, 400, false, false,
null, true));
+
+ server1.invoke(() -> verifySenderState("ln", false, false));
+ server2.invoke(() -> verifySenderState("ln", false, false));
+ server3.invoke(() -> verifySenderState("ln", false, false));
+
+ locatorSite1.invoke(
+ () -> validateGatewaySenderMXBeanProxy(getMember(server1.getVM()),
"ln", false, false));
+ locatorSite1.invoke(
+ () -> validateGatewaySenderMXBeanProxy(getMember(server2.getVM()),
"ln", false, false));
+ locatorSite1.invoke(
+ () -> validateGatewaySenderMXBeanProxy(getMember(server3.getVM()),
"ln", false, false));
+
+ String command =
+ CliStrings.START_GATEWAYSENDER + " --" +
CliStrings.START_GATEWAYSENDER__ID + "=ln --"
+ + CliStrings.START_GATEWAYSENDER__CLEAN_QUEUE + CliStrings.MEMBER
+ "="
+ + vm1Member.getId();
+ CommandResult cmdResult = executeCommandWithIgnoredExceptions(command);
+ assertThat(cmdResult).isNotNull();
+ assertThat(cmdResult.getStatus()).isSameAs(Result.Status.ERROR);
+
+
+ locatorSite1.invoke(
+ () -> validateGatewaySenderMXBeanProxy(getMember(server1.getVM()),
"ln", false, false));
+ locatorSite1.invoke(
+ () -> validateGatewaySenderMXBeanProxy(getMember(server2.getVM()),
"ln", false, false));
+ locatorSite1.invoke(
+ () -> validateGatewaySenderMXBeanProxy(getMember(server3.getVM()),
"ln", false, false));
+
+ server1.invoke(() -> verifySenderState("ln", false, false));
+ server2.invoke(() -> verifySenderState("ln", false, false));
+ server3.invoke(() -> verifySenderState("ln", false, false));
+ }
+
+
private CommandResult executeCommandWithIgnoredExceptions(String command)
throws Exception {
try (IgnoredException ie = IgnoredException.addIgnoredException("Could not
connect")) {