Repository: geode Updated Branches: refs/heads/develop 4e61799f9 -> e5121abad
GEODE-2568: The JMX MBean is now removed when its AsyncEventQueue is removed Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/e5121aba Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/e5121aba Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/e5121aba Branch: refs/heads/develop Commit: e5121abad3b4c11620b0be037ada4484e7248875 Parents: 4e61799 Author: Barry Oglesby <[email protected]> Authored: Wed Mar 1 13:20:31 2017 -0800 Committer: Barry Oglesby <[email protected]> Committed: Fri Mar 3 15:08:53 2017 -0800 ---------------------------------------------------------------------- .../distributed/internal/ResourceEvent.java | 1 + .../geode/internal/cache/GemFireCacheImpl.java | 1 + .../geode/management/JMXNotificationType.java | 8 ++ .../internal/ManagementConstants.java | 2 + .../internal/beans/AsyncEventQueueMBean.java | 3 + .../beans/AsyncEventQueueMBeanBridge.java | 3 + .../internal/beans/ManagementAdapter.java | 36 ++++++++ .../internal/beans/ManagementListener.java | 4 + .../geode/internal/cache/wan/WANTestBase.java | 6 ++ .../management/WANManagementDUnitTest.java | 92 ++++++++++++++------ 10 files changed, 131 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/e5121aba/geode-core/src/main/java/org/apache/geode/distributed/internal/ResourceEvent.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/ResourceEvent.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/ResourceEvent.java index acfb157..b004113 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/ResourceEvent.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/ResourceEvent.java @@ -34,6 +34,7 @@ public enum ResourceEvent { MANAGER_STOP, LOCATOR_START, ASYNCEVENTQUEUE_CREATE, + ASYNCEVENTQUEUE_REMOVE, SYSTEM_ALERT, CACHE_SERVER_START, CACHE_SERVER_STOP, http://git-wip-us.apache.org/repos/asf/geode/blob/e5121aba/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java index 66f1a4a..fcf7a2a 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java @@ -4131,6 +4131,7 @@ public class GemFireCacheImpl this.allAsyncEventQueues.remove(asyncQueue); this.allVisibleAsyncEventQueues.remove(asyncQueue); } + system.handleResourceEvent(ResourceEvent.ASYNCEVENTQUEUE_REMOVE, asyncQueue); } /* Cache API - get the conflict resolver for WAN */ http://git-wip-us.apache.org/repos/asf/geode/blob/e5121aba/geode-core/src/main/java/org/apache/geode/management/JMXNotificationType.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/management/JMXNotificationType.java b/geode-core/src/main/java/org/apache/geode/management/JMXNotificationType.java index 3fb8ba2..ba3daeb 100644 --- a/geode-core/src/main/java/org/apache/geode/management/JMXNotificationType.java +++ b/geode-core/src/main/java/org/apache/geode/management/JMXNotificationType.java @@ -167,6 +167,14 @@ public interface JMXNotificationType { DistributionConfig.GEMFIRE_PREFIX + "distributedsystem.asycn.event.queue.created"; /** + * Notification type which indicates that an async queue has been closed. <BR> + * The value of this type string is + * <CODE>gemfire.distributedsystem.async.event.queue.closed</CODE>. + */ + public static final String ASYNC_EVENT_QUEUE_CLOSED = + DistributionConfig.GEMFIRE_PREFIX + "distributedsystem.async.event.queue.closed"; + + /** * Notification type which indicates a GemFire system generated alert <BR> * The value of this type string is <CODE>system.alert</CODE>. */ http://git-wip-us.apache.org/repos/asf/geode/blob/e5121aba/geode-core/src/main/java/org/apache/geode/management/internal/ManagementConstants.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/ManagementConstants.java b/geode-core/src/main/java/org/apache/geode/management/internal/ManagementConstants.java index d5bfedb..151aa9d 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/ManagementConstants.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/ManagementConstants.java @@ -187,6 +187,8 @@ public interface ManagementConstants { public static final String ASYNC_EVENT_QUEUE_CREATED_PREFIX = "Async Event Queue is Created in the VM "; + public static final String ASYNC_EVENT_QUEUE_CLOSED_PREFIX = + "Async Event Queue is Closed in the VM "; public static final String CACHE_SERVICE_CREATED_PREFIX = "Cache Service Created With Name "; http://git-wip-us.apache.org/repos/asf/geode/blob/e5121aba/geode-core/src/main/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBean.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBean.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBean.java index 2f2757f..7cea8e9 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBean.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBean.java @@ -107,4 +107,7 @@ public class AsyncEventQueueMBean extends NotificationBroadcasterSupport return bridge.getEventQueueSize(); } + public void stopMonitor() { + bridge.stopMonitor(); + } } http://git-wip-us.apache.org/repos/asf/geode/blob/e5121aba/geode-core/src/main/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBeanBridge.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBeanBridge.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBeanBridge.java index e57bc55..83cfb88 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBeanBridge.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBeanBridge.java @@ -114,4 +114,7 @@ public class AsyncEventQueueMBeanBridge { } } + public void stopMonitor() { + monitor.stopListener(); + } } http://git-wip-us.apache.org/repos/asf/geode/blob/e5121aba/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java index f601a9a..183a5a8 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java @@ -592,6 +592,42 @@ public class ManagementAdapter { } /** + * Handles AsyncEventQueue Removal + * + * @param queue The AsyncEventQueue being removed + */ + public void handleAsyncEventQueueRemoval(AsyncEventQueue queue) throws ManagementException { + if (!isServiceInitialised("handleAsyncEventQueueRemoval")) { + return; + } + + ObjectName asycnEventQueueMBeanName = MBeanJMXAdapter.getAsycnEventQueueMBeanName( + cacheImpl.getDistributedSystem().getDistributedMember(), queue.getId()); + AsyncEventQueueMBean bean = null; + try { + bean = (AsyncEventQueueMBean) service.getLocalAsyncEventQueueMXBean(queue.getId()); + if (bean == null) { + return; + } + } catch (ManagementException e) { + // If no bean found its a NO-OP + if (logger.isDebugEnabled()) { + logger.debug(e.getMessage(), e); + } + return; + } + + bean.stopMonitor(); + + service.unregisterMBean(asycnEventQueueMBeanName); + + Notification notification = new Notification(JMXNotificationType.ASYNC_EVENT_QUEUE_CLOSED, + memberSource, SequenceNumber.next(), System.currentTimeMillis(), + ManagementConstants.ASYNC_EVENT_QUEUE_CLOSED_PREFIX + queue.getId()); + memberLevelNotifEmitter.sendNotification(notification); + } + + /** * Sends the alert with the Object source as member. This notification will get filtered out for * particular alert level * http://git-wip-us.apache.org/repos/asf/geode/blob/e5121aba/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementListener.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementListener.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementListener.java index 12392b5..d841122 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementListener.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementListener.java @@ -185,6 +185,10 @@ public class ManagementListener implements ResourceEventsListener { AsyncEventQueue queue = (AsyncEventQueue) resource; adapter.handleAsyncEventQueueCreation(queue); break; + case ASYNCEVENTQUEUE_REMOVE: + AsyncEventQueue removedQueue = (AsyncEventQueue) resource; + adapter.handleAsyncEventQueueRemoval(removedQueue); + break; case SYSTEM_ALERT: AlertDetails details = (AlertDetails) resource; adapter.handleSystemNotification(details); http://git-wip-us.apache.org/repos/asf/geode/blob/e5121aba/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java index f0704e8..c8de7dc 100644 --- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java @@ -3470,6 +3470,12 @@ public class WANTestBase extends JUnit4DistributedTestCase { } } + public static void destroyAsyncEventQueue(String id) { + AsyncEventQueueImpl aeq = (AsyncEventQueueImpl) cache.getAsyncEventQueue(id); + assertNotNull(aeq); + aeq.destroy(); + } + protected static void verifyListenerEvents(final long expectedNumEvents) { Awaitility.await().atMost(60, TimeUnit.SECONDS) .until(() -> listener1.getNumEvents() == expectedNumEvents); http://git-wip-us.apache.org/repos/asf/geode/blob/e5121aba/geode-wan/src/test/java/org/apache/geode/management/WANManagementDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/management/WANManagementDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/management/WANManagementDUnitTest.java index 03c0d62..880648d 100644 --- a/geode-wan/src/test/java/org/apache/geode/management/WANManagementDUnitTest.java +++ b/geode-wan/src/test/java/org/apache/geode/management/WANManagementDUnitTest.java @@ -14,21 +14,19 @@ */ package org.apache.geode.management; +import org.apache.geode.management.internal.SystemManagementService; import org.junit.experimental.categories.Category; import org.junit.Test; import static org.junit.Assert.*; import org.awaitility.Awaitility; -import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase; -import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase; import org.apache.geode.test.junit.categories.DistributedTest; import org.apache.geode.test.junit.categories.FlakyTest; import java.util.Map; import java.util.concurrent.TimeUnit; -import javax.management.MBeanServer; import javax.management.ObjectName; import org.apache.geode.cache.Cache; @@ -36,7 +34,6 @@ import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.Locator; import org.apache.geode.internal.cache.GemFireCacheImpl; import org.apache.geode.internal.cache.wan.WANTestBase; -import org.apache.geode.management.internal.MBeanJMXAdapter; import org.apache.geode.test.dunit.Host; import org.apache.geode.test.dunit.SerializableRunnable; import org.apache.geode.test.dunit.VM; @@ -215,16 +212,50 @@ public class WANManagementDUnitTest extends ManagementTestBase { 1, 100, false)); nyReceiver.invoke(() -> WANTestBase.createReceiver()); - checkAsyncQueueMBean(puneSender); - checkAsyncQueueMBean(managing); + checkAsyncQueueMBean(puneSender, true); + checkAsyncQueueMBean(managing, true); DistributedMember puneMember = (DistributedMember) puneSender.invoke(() -> WANManagementDUnitTest.getMember()); - checkProxyAsyncQueue(managing, puneMember); + checkProxyAsyncQueue(managing, puneMember, true); } + @Test + public void testCreateDestroyAsyncEventQueue() throws Exception { + VM memberVM = getManagedNodeList().get(2); + VM managerVm = getManagingNode(); + VM locatorVm = Host.getLocator(); + + int locatorPort = (Integer) locatorVm.invoke(() -> WANManagementDUnitTest.getLocatorPort()); + + memberVM.invoke(() -> WANTestBase.createCache(locatorPort)); + managerVm.invoke(() -> WANTestBase.createManagementCache(locatorPort)); + startManagingNode(managerVm); + + // Create AsyncEventQueue + String aeqId = "pn"; + memberVM.invoke( + () -> WANTestBase.createAsyncEventQueue(aeqId, false, 100, 100, false, false, null, false)); + managerVm.invoke( + () -> WANTestBase.createAsyncEventQueue(aeqId, false, 100, 100, false, false, null, false)); + + // Verify AsyncEventQueueMXBean exists + checkAsyncQueueMBean(memberVM, true); + checkAsyncQueueMBean(managerVm, true); + DistributedMember member = memberVM.invoke(() -> WANManagementDUnitTest.getMember()); + checkProxyAsyncQueue(managerVm, member, true); + + // Destroy AsyncEventQueue + memberVM.invoke(() -> WANTestBase.destroyAsyncEventQueue(aeqId)); + managerVm.invoke(() -> WANTestBase.destroyAsyncEventQueue(aeqId)); + + // Verify AsyncEventQueueMXBean no longer exists + checkAsyncQueueMBean(memberVM, false); + checkAsyncQueueMBean(managerVm, false); + checkProxyAsyncQueue(managerVm, member, false); + } @SuppressWarnings("serial") protected void checkSenderNavigationAPIS(final VM vm, final DistributedMember senderMember) { @@ -448,15 +479,18 @@ public class WANManagementDUnitTest extends ManagementTestBase { * @param vm reference to VM */ @SuppressWarnings("serial") - protected void checkAsyncQueueMBean(final VM vm) { + protected void checkAsyncQueueMBean(final VM vm, final boolean shouldExist) { SerializableRunnable checkAsyncQueueMBean = new SerializableRunnable("Check Async Queue MBean") { public void run() { Cache cache = GemFireCacheImpl.getInstance(); ManagementService service = ManagementService.getManagementService(cache); AsyncEventQueueMXBean bean = service.getLocalAsyncEventQueueMXBean("pn"); - assertNotNull(bean); - // Already in started State + if (shouldExist) { + assertNotNull(bean); + } else { + assertNull(bean); + } } }; vm.invoke(checkAsyncQueueMBean); @@ -468,28 +502,36 @@ public class WANManagementDUnitTest extends ManagementTestBase { * @param vm reference to VM */ @SuppressWarnings("serial") - protected void checkProxyAsyncQueue(final VM vm, final DistributedMember senderMember) { + protected void checkProxyAsyncQueue(final VM vm, final DistributedMember senderMember, + final boolean shouldExist) { SerializableRunnable checkProxyAsyncQueue = new SerializableRunnable("Check Proxy Async Queue") { public void run() { Cache cache = GemFireCacheImpl.getInstance(); - ManagementService service = ManagementService.getManagementService(cache); - AsyncEventQueueMXBean bean = null; - try { - bean = MBeanUtil.getAsyncEventQueueMBeanProxy(senderMember, "pn"); - } catch (Exception e) { - fail("Could not obtain Sender Proxy in desired time " + e); - } - assertNotNull(bean); + SystemManagementService service = + (SystemManagementService) ManagementService.getManagementService(cache); final ObjectName queueMBeanName = service.getAsyncEventQueueMBeanName(senderMember, "pn"); - - try { - MBeanUtil.printBeanDetails(queueMBeanName); - } catch (Exception e) { - fail("Error while Printing Bean Details " + e); + AsyncEventQueueMXBean bean = null; + if (shouldExist) { + // Verify the MBean proxy exists + try { + bean = MBeanUtil.getAsyncEventQueueMBeanProxy(senderMember, "pn"); + } catch (Exception e) { + fail("Could not obtain Sender Proxy in desired time " + e); + } + assertNotNull(bean); + + try { + MBeanUtil.printBeanDetails(queueMBeanName); + } catch (Exception e) { + fail("Error while Printing Bean Details " + e); + } + } else { + // Verify the MBean proxy doesn't exist + bean = service.getMBeanProxy(queueMBeanName, AsyncEventQueueMXBean.class); + assertNull(bean); } - } }; vm.invoke(checkProxyAsyncQueue);
