[ 
https://issues.apache.org/jira/browse/GEODE-3788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16260260#comment-16260260
 ] 

ASF GitHub Bot commented on GEODE-3788:
---------------------------------------

jinmeiliao closed pull request #1081: GEODE-3788: add utility methods to get 
the async event queues in the …
URL: https://github.com/apache/geode/pull/1081
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/geode-core/src/main/java/org/apache/geode/management/ManagementService.java 
b/geode-core/src/main/java/org/apache/geode/management/ManagementService.java
index 887b04abed..9f3f5478ee 100755
--- 
a/geode-core/src/main/java/org/apache/geode/management/ManagementService.java
+++ 
b/geode-core/src/main/java/org/apache/geode/management/ManagementService.java
@@ -216,6 +216,11 @@ public abstract DistributedLockServiceMXBean 
getDistributedLockServiceMXBean(
    */
   public abstract Set<ObjectName> queryMBeanNames(DistributedMember member);
 
+  /**
+   * Returns the ids of the async event queues on this member
+   */
+  public abstract Set<String> getAsyncEventQueueIds(DistributedMember member);
+
   /**
    * Returns an instance of an MBean. This is a reference to the MBean 
instance and not a
    * {@link ObjectInstance}.
diff --git 
a/geode-core/src/main/java/org/apache/geode/management/internal/SystemManagementService.java
 
b/geode-core/src/main/java/org/apache/geode/management/internal/SystemManagementService.java
index 60615db12c..d067fc1a91 100755
--- 
a/geode-core/src/main/java/org/apache/geode/management/internal/SystemManagementService.java
+++ 
b/geode-core/src/main/java/org/apache/geode/management/internal/SystemManagementService.java
@@ -18,6 +18,7 @@
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.stream.Collectors;
 
 import javax.management.Notification;
 import javax.management.ObjectName;
@@ -357,6 +358,13 @@ public MemberMXBean getMemberMXBean() {
     }
   }
 
+  @Override
+  public Set<String> getAsyncEventQueueIds(DistributedMember member) {
+    Set<ObjectName> mBeanNames = this.queryMBeanNames(member);
+    return mBeanNames.stream().filter(x -> 
"AsyncEventQueue".equals(x.getKeyProperty("service")))
+        .map(x -> x.getKeyProperty("queue")).collect(Collectors.toSet());
+  }
+
   @Override
   public ObjectName registerMBean(Object object, ObjectName objectName) {
     verifyManagementService();
diff --git 
a/geode-core/src/main/java/org/apache/geode/management/internal/cli/CliUtil.java
 
b/geode-core/src/main/java/org/apache/geode/management/internal/cli/CliUtil.java
index 02eb8b81a6..686bbc5f83 100755
--- 
a/geode-core/src/main/java/org/apache/geode/management/internal/cli/CliUtil.java
+++ 
b/geode-core/src/main/java/org/apache/geode/management/internal/cli/CliUtil.java
@@ -60,6 +60,7 @@
 import org.apache.geode.management.ManagementService;
 import org.apache.geode.management.cli.Result;
 import org.apache.geode.management.internal.MBeanJMXAdapter;
+import org.apache.geode.management.internal.SystemManagementService;
 import org.apache.geode.management.internal.cli.exceptions.UserErrorException;
 import org.apache.geode.management.internal.cli.i18n.CliStrings;
 import org.apache.geode.management.internal.cli.result.ResultBuilder;
@@ -406,6 +407,16 @@ public static Result getFunctionResult(ResultCollector<?, 
?> rc, String commandN
     return result;
   }
 
+  public static Set<DistributedMember> 
getMembersWithAsyncEventQueue(InternalCache cache,
+      String queueId) {
+    SystemManagementService managementService =
+        (SystemManagementService) 
ManagementService.getExistingManagementService(cache);
+    Set<DistributedMember> members = findMembers(null, null);
+    return members.stream()
+        .filter(m -> 
managementService.getAsyncEventQueueIds(m).contains(queueId))
+        .collect(Collectors.toSet());
+  }
+
   static class CustomFileFilter implements FileFilter {
     private String extensionWithDot;
 
diff --git 
a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/GfshCommand.java
 
b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/GfshCommand.java
index 134a15300c..6a30378b3d 100644
--- 
a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/GfshCommand.java
+++ 
b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/GfshCommand.java
@@ -183,4 +183,8 @@ default ManagementService getManagementService() {
   default Set<DistributedMember> findAnyMembersForRegion(InternalCache cache, 
String regionPath) {
     return CliUtil.getRegionAssociatedMembers(regionPath, cache, false);
   }
+
+  default Set<DistributedMember> findMembersWithAsyncEventQueue(String 
queueId) {
+    return CliUtil.getMembersWithAsyncEventQueue(getCache(), queueId);
+  }
 }
diff --git 
a/geode-core/src/test/java/org/apache/geode/management/internal/cli/CliUtilDUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/management/internal/cli/CliUtilDUnitTest.java
index f1050d8423..d3b97b80da 100644
--- 
a/geode-core/src/test/java/org/apache/geode/management/internal/cli/CliUtilDUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/management/internal/cli/CliUtilDUnitTest.java
@@ -31,6 +31,7 @@
 import org.apache.geode.distributed.ConfigurationProperties;
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.wan.MyAsyncEventListener;
 import org.apache.geode.management.internal.cli.exceptions.UserErrorException;
 import org.apache.geode.test.dunit.rules.LocatorServerStartupRule;
 import org.apache.geode.test.dunit.rules.MemberVM;
@@ -184,6 +185,35 @@ public void getMemberByNameOrId() throws Exception {
   }
 
 
+  @Test
+  public void getMembersWithQueueId() throws Exception {
+    gfsh.executeAndAssertThat("create async-event-queue --id=queue1 
--group=group1 --listener="
+        + MyAsyncEventListener.class.getName()).statusIsSuccess();
+    gfsh.executeAndAssertThat("create async-event-queue --id=queue2 
--group=group2 --listener="
+        + MyAsyncEventListener.class.getName()).statusIsSuccess();
+    gfsh.executeAndAssertThat(
+        "create async-event-queue --id=queue --listener=" + 
MyAsyncEventListener.class.getName())
+        .statusIsSuccess();
+
+    locator.waitTillAsyncEventQueuesAreReadyOnServers("queue1", 2);
+    locator.waitTillAsyncEventQueuesAreReadyOnServers("queue2", 2);
+    locator.waitTillAsyncEventQueuesAreReadyOnServers("queue", 4);
+
+    locator.invoke(() -> {
+      members =
+          
CliUtil.getMembersWithAsyncEventQueue(LocatorServerStartupRule.getCache(), 
"queue1");
+      assertThat(getNames(members)).containsExactlyInAnyOrder("member1", 
"member2");
+
+      members =
+          
CliUtil.getMembersWithAsyncEventQueue(LocatorServerStartupRule.getCache(), 
"queue2");
+      assertThat(getNames(members)).containsExactlyInAnyOrder("member3", 
"member4");
+
+      members = 
CliUtil.getMembersWithAsyncEventQueue(LocatorServerStartupRule.getCache(), 
"queue");
+      assertThat(getNames(members)).containsExactlyInAnyOrder("member1", 
"member2", "member3",
+          "member4");
+    });
+  }
+
   private static Set<String> getNames(Set<DistributedMember> members) {
     return 
members.stream().map(DistributedMember::getName).collect(Collectors.toSet());
   }
diff --git 
a/geode-core/src/test/java/org/apache/geode/test/dunit/rules/MemberVM.java 
b/geode-core/src/test/java/org/apache/geode/test/dunit/rules/MemberVM.java
index 0e9cfbc323..4e04b8eb8b 100644
--- a/geode-core/src/test/java/org/apache/geode/test/dunit/rules/MemberVM.java
+++ b/geode-core/src/test/java/org/apache/geode/test/dunit/rules/MemberVM.java
@@ -137,4 +137,12 @@ public void waitTillDiskstoreIsReady(String diskstoreName, 
int serverCount) {
     vm.invoke(() -> 
LocatorServerStartupRule.memberStarter.waitTillDiskStoreIsReady(diskstoreName,
         serverCount));
   }
+
+  public void waitTillAsyncEventQueuesAreReadyOnServers(String queueId, int 
serverCount) {
+    vm.invoke(() -> {
+      
LocatorServerStartupRule.memberStarter.waitTillAsyncEventQueuesAreReadyOnServers(queueId,
+          serverCount);
+    });
+  }
+
 }
diff --git 
a/geode-core/src/test/java/org/apache/geode/test/junit/rules/MemberStarterRule.java
 
b/geode-core/src/test/java/org/apache/geode/test/junit/rules/MemberStarterRule.java
index da0f588ce1..e2dcc9cc68 100644
--- 
a/geode-core/src/test/java/org/apache/geode/test/junit/rules/MemberStarterRule.java
+++ 
b/geode-core/src/test/java/org/apache/geode/test/junit/rules/MemberStarterRule.java
@@ -43,6 +43,7 @@
 import org.apache.geode.management.DistributedRegionMXBean;
 import org.apache.geode.management.DistributedSystemMXBean;
 import org.apache.geode.management.ManagementService;
+import org.apache.geode.management.internal.cli.CliUtil;
 import org.apache.geode.security.SecurityManager;
 import 
org.apache.geode.test.junit.rules.serializable.SerializableExternalResource;
 
@@ -205,9 +206,9 @@ protected void normalizeProperties() {
       if (properties.containsKey(NAME)) {
         name = properties.getProperty(NAME);
       } else {
-        if (this instanceof ServerStarterRule)
+        if (this instanceof ServerStarterRule) {
           name = "server";
-        else {
+        } else {
           name = "locator";
         }
       }
@@ -263,6 +264,11 @@ public void waitTillDiskStoreIsReady(String diskstoreName, 
int serverCount) {
         .until(() -> getDiskStoreCount(diskstoreName) == serverCount);
   }
 
+  public void waitTillAsyncEventQueuesAreReadyOnServers(String queueId, int 
serverCount) {
+    await().atMost(2, TimeUnit.SECONDS).until(
+        () -> CliUtil.getMembersWithAsyncEventQueue(getCache(), 
queueId).size() == serverCount);
+  }
+
   abstract void stopMember();
 
   @Override


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> alter async event queue attributes
> ----------------------------------
>
>                 Key: GEODE-3788
>                 URL: https://issues.apache.org/jira/browse/GEODE-3788
>             Project: Geode
>          Issue Type: Sub-task
>          Components: gfsh
>            Reporter: Swapnil Bawaskar
>
> We should add a new {{alter async-event-queue}} gfsh command that will allow 
> users to change the following attributes on the AsyncEventQueue:
> - batch size
> - batch time interval
> - maximum queue memory
> Attributes changed with this command should only be reflected in cluster 
> configuration. We will require users to do a rolling re-start of the servers 
> for the new settings to take effect.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to