This is an automated email from the ASF dual-hosted git repository. jensdeppe pushed a commit to branch release/1.9.2 in repository https://gitbox.apache.org/repos/asf/geode.git
commit f49a4ac75fd163de68410c022e3598f93403655f Author: agingade <[email protected]> AuthorDate: Wed Sep 11 11:09:01 2019 -0700 GEODE-7149: Changes to support AsyncEventQueue's dispatcher status with AsyncEventQueue beans (#4029) Mbean changes to support AsyncEventQueue's pauseDispatcher() and resumeDispatcher() apis. --- .../geode/management/AsyncEventQueueMXBean.java | 8 + .../internal/beans/AsyncEventQueueMBean.java | 5 + .../internal/beans/AsyncEventQueueMBeanBridge.java | 4 + .../internal/beans/AsyncEventQueueMBeanTest.java | 52 +++ .../geode/management/AEQManagementDUnitTest.java | 415 +++++++++++++++++++++ 5 files changed, 484 insertions(+) diff --git a/geode-core/src/main/java/org/apache/geode/management/AsyncEventQueueMXBean.java b/geode-core/src/main/java/org/apache/geode/management/AsyncEventQueueMXBean.java index e6d68f7..6feb5c4 100644 --- a/geode-core/src/main/java/org/apache/geode/management/AsyncEventQueueMXBean.java +++ b/geode-core/src/main/java/org/apache/geode/management/AsyncEventQueueMXBean.java @@ -127,4 +127,12 @@ public interface AsyncEventQueueMXBean { * Returns the number of bytes overflowed to disk for this Sender. */ long getBytesOverflowedToDisk(); + + /** + * Returns the state of the event dispatcher. + * + * @return True if the dispatcher is paused, false otherwise. + */ + boolean isDispatchingPaused(); + } diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBean.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBean.java index bf7afd0..bd95a3a 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBean.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBean.java @@ -124,4 +124,9 @@ public class AsyncEventQueueMBean extends NotificationBroadcasterSupport public void stopMonitor() { bridge.stopMonitor(); } + + @Override + public boolean isDispatchingPaused() { + return bridge.isDispatchingPaused(); + } } diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBeanBridge.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBeanBridge.java index b1a1caa..e09ab08 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBeanBridge.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBeanBridge.java @@ -147,4 +147,8 @@ public class AsyncEventQueueMBeanBridge { public void stopMonitor() { monitor.stopListener(); } + + public boolean isDispatchingPaused() { + return queueImpl.isDispatchingPaused(); + } } diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBeanTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBeanTest.java new file mode 100644 index 0000000..6901db4 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBeanTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.management.internal.beans; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.geode.test.junit.categories.JMXTest; + +@Category(JMXTest.class) +public class AsyncEventQueueMBeanTest { + + private AsyncEventQueueMBean asyncEventQueueMBean; + + private AsyncEventQueueMBeanBridge asyncEventQueueMBeanBridge; + + @Test + public void asyncEvenQueueCreatedWithDispatcherNotPaused() { + asyncEventQueueMBeanBridge = mock(AsyncEventQueueMBeanBridge.class); + asyncEventQueueMBean = new AsyncEventQueueMBean(asyncEventQueueMBeanBridge); + + when(asyncEventQueueMBeanBridge.isDispatchingPaused()).thenReturn(false); + + assertThat(asyncEventQueueMBean.isDispatchingPaused()).isFalse(); + } + + @Test + public void asyncEvenQueueCreatedWithDispatcherPaused() { + asyncEventQueueMBeanBridge = mock(AsyncEventQueueMBeanBridge.class); + asyncEventQueueMBean = new AsyncEventQueueMBean(asyncEventQueueMBeanBridge); + + when(asyncEventQueueMBeanBridge.isDispatchingPaused()).thenReturn(true); + + assertThat(asyncEventQueueMBean.isDispatchingPaused()).isTrue(); + } +} diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/management/AEQManagementDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/management/AEQManagementDUnitTest.java new file mode 100644 index 0000000..5ce3038 --- /dev/null +++ b/geode-wan/src/distributedTest/java/org/apache/geode/management/AEQManagementDUnitTest.java @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.management; + +import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL; +import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; +import java.io.Serializable; +import java.util.Properties; + +import javax.management.ObjectName; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.geode.cache.DiskStoreFactory; +import org.apache.geode.cache.asyncqueue.AsyncEventListener; +import org.apache.geode.cache.asyncqueue.AsyncEventQueue; +import org.apache.geode.cache.asyncqueue.AsyncEventQueueFactory; +import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl; +import org.apache.geode.distributed.ConfigurationProperties; +import org.apache.geode.distributed.DistributedMember; +import org.apache.geode.distributed.Locator; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.AvailablePortHelper; +import org.apache.geode.internal.cache.wan.MyAsyncEventListener; +import org.apache.geode.management.internal.SystemManagementService; +import org.apache.geode.test.awaitility.GeodeAwaitility; +import org.apache.geode.test.dunit.SerializableCallableIF; +import org.apache.geode.test.dunit.VM; +import org.apache.geode.test.dunit.rules.ClusterStartupRule; +import org.apache.geode.test.dunit.rules.MemberVM; +import org.apache.geode.test.junit.categories.AEQTest; + +@Category({AEQTest.class}) +public class AEQManagementDUnitTest implements Serializable { + + private MemberVM locator, server1, server2; + + private String aeqID = "aeqMBeanTest"; + @Rule + public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(); + + public AEQManagementDUnitTest() { + super(); + } + + @Before + public void setup() { + locator = clusterStartupRule.startLocatorVM(0, locatorProperties()); + locator.invoke(() -> startManagerService()); + server1 = clusterStartupRule.startServerVM(1, locator.getPort()); + server2 = clusterStartupRule.startServerVM(2, locator.getPort()); + } + + private void startManagerService() { + SystemManagementService service = (SystemManagementService) ManagementService + .getManagementService(ClusterStartupRule.getCache()); + service.createManager(); + } + + @Test + public void testCreateAsyncEventQueueAndVerifyBeansRegistered() { + server1.invoke(() -> createAsyncEventQueueAndVerifyBeanRegisteration("server1", false)); + + server2.invoke(() -> createAsyncEventQueueAndVerifyBeanRegisteration("server2", false)); + + final SerializableCallableIF<InternalDistributedMember> memberSerializableCallableIF = + () -> ClusterStartupRule.getCache().getMyId(); + + DistributedMember memberServer1 = server1.invoke(memberSerializableCallableIF); + + locator.invoke(() -> { + waitForProxyBeansArrival(memberServer1); + + AsyncEventQueueMXBean aeqBean = getAsyncEventQueueMXBean(memberServer1); + + assertThat(aeqBean).isNotNull(); + }); + + DistributedMember memberServer2 = server2.invoke( + memberSerializableCallableIF); + + locator.invoke(() -> { + waitForProxyBeansArrival(memberServer2); + + AsyncEventQueueMXBean aeqBean = getAsyncEventQueueMXBean(memberServer2); + + assertThat(aeqBean).isNotNull(); + }); + + } + + @Test + public void testDestroyAsyncEventQueueAndVerifyBeansAreUpdated() { + server1.invoke(() -> createAsyncEventQueueAndVerifyBeanRegisteration("server1", false)); + + server2.invoke(() -> createAsyncEventQueueAndVerifyBeanRegisteration("server2", false)); + + final SerializableCallableIF<InternalDistributedMember> memberSerializableCallableIF = + () -> ClusterStartupRule.getCache().getMyId(); + + DistributedMember memberServer1 = server1.invoke(memberSerializableCallableIF); + locator.invoke(() -> { + waitForProxyBeansArrival(memberServer1); + + AsyncEventQueueMXBean aeqBean = getAsyncEventQueueMXBean(memberServer1); + + assertThat(aeqBean).isNotNull(); + }); + + + DistributedMember memberServer2 = server2.invoke(memberSerializableCallableIF); + locator.invoke(() -> { + waitForProxyBeansArrival(memberServer2); + + AsyncEventQueueMXBean aeqBean = getAsyncEventQueueMXBean(memberServer2); + + assertThat(aeqBean).isNotNull(); + }); + + server1.invoke(() -> { + destroyAsyncEventQueue(); + ManagementService mService = waitForAeqBeanToUnRegister(); + + assertThat(mService.getLocalAsyncEventQueueMXBean(aeqID)).isNull(); + }); + + server2.invoke(() -> { + destroyAsyncEventQueue(); + ManagementService mService = waitForAeqBeanToUnRegister(); + + assertThat(mService.getLocalAsyncEventQueueMXBean(aeqID)).isNull(); + }); + + + locator.invoke(() -> { + waitForProxyBeansRemoval(memberServer1); + + AsyncEventQueueMXBean aeqBean = getAsyncEventQueueMXBean(memberServer1); + + assertThat(aeqBean).isNull(); + }); + + locator.invoke(() -> { + waitForProxyBeansRemoval(memberServer2); + + AsyncEventQueueMXBean aeqBean = getAsyncEventQueueMXBean(memberServer2); + + assertThat(aeqBean).isNull(); + }); + + } + + @Test + public void testCreateAEQWithDispatcherInPausedStateAndVerifyUsingMBean() { + server1.invoke(() -> { + createAsyncEventQueueAndVerifyBeanRegisteration("server1", true); + + ManagementService mService = + ManagementService.getManagementService(ClusterStartupRule.getCache()); + AsyncEventQueueMXBean aeqBean = mService.getLocalAsyncEventQueueMXBean(aeqID); + + assertThat(aeqBean.isDispatchingPaused()).isEqualTo(true); + }); + + server2.invoke(() -> { + createAsyncEventQueueAndVerifyBeanRegisteration("server2", true); + + ManagementService mService = + ManagementService.getManagementService(ClusterStartupRule.getCache()); + AsyncEventQueueMXBean aeqBean = mService.getLocalAsyncEventQueueMXBean(aeqID); + + assertThat(aeqBean.isDispatchingPaused()).isEqualTo(true); + }); + + final SerializableCallableIF<InternalDistributedMember> memberSerializableCallableIF = + () -> ClusterStartupRule.getCache().getMyId(); + + DistributedMember memberServer1 = server1.invoke(memberSerializableCallableIF); + + locator.invoke(() -> { + waitForProxyBeansArrival(memberServer1); + + AsyncEventQueueMXBean aeqBean = getAsyncEventQueueMXBean(memberServer1); + + assertThat(aeqBean.isDispatchingPaused()).isEqualTo(true); + }); + + DistributedMember memberServer2 = server2.invoke( + memberSerializableCallableIF); + + locator.invoke(() -> { + waitForProxyBeansArrival(memberServer2); + + AsyncEventQueueMXBean aeqBean = getAsyncEventQueueMXBean(memberServer2); + + assertThat(aeqBean.isDispatchingPaused()).isEqualTo(true); + }); + } + + @Test + public void testCreateAEQWithDispatcherInPausedStateAndResumeAndVerifyUsingMBean() { + server1.invoke(() -> { + createAsyncEventQueueAndVerifyBeanRegisteration("server1", true); + + ManagementService mService = + ManagementService.getManagementService(ClusterStartupRule.getCache()); + AsyncEventQueueMXBean aeqBean = mService.getLocalAsyncEventQueueMXBean(aeqID); + + assertThat(aeqBean.isDispatchingPaused()).isEqualTo(true); + }); + + server2.invoke(() -> { + createAsyncEventQueueAndVerifyBeanRegisteration("server2", true); + + ManagementService mService = + ManagementService.getManagementService(ClusterStartupRule.getCache()); + AsyncEventQueueMXBean aeqBean = mService.getLocalAsyncEventQueueMXBean(aeqID); + + assertThat(aeqBean.isDispatchingPaused()).isEqualTo(true); + }); + + final SerializableCallableIF<InternalDistributedMember> memberSerializableCallableIF = + () -> ClusterStartupRule.getCache().getMyId(); + + DistributedMember memberServer1 = server1.invoke(memberSerializableCallableIF); + + locator.invoke(() -> { + waitForProxyBeansArrival(memberServer1); + + AsyncEventQueueMXBean aeqBean = getAsyncEventQueueMXBean(memberServer1); + + assertThat(aeqBean.isDispatchingPaused()).isEqualTo(true); + }); + + DistributedMember memberServer2 = server2.invoke( + memberSerializableCallableIF); + + locator.invoke(() -> { + waitForProxyBeansArrival(memberServer2); + + AsyncEventQueueMXBean aeqBean = getAsyncEventQueueMXBean(memberServer2); + + assertThat(aeqBean.isDispatchingPaused()).isEqualTo(true); + }); + + server1.invoke(() -> { + AsyncEventQueue asyncEventQueue = ClusterStartupRule.getCache().getAsyncEventQueue(aeqID); + asyncEventQueue.resumeEventDispatching(); + + ManagementService mService = + ManagementService.getManagementService(ClusterStartupRule.getCache()); + + GeodeAwaitility.await().untilAsserted(() -> { + AsyncEventQueueMXBean aeqBean = mService.getLocalAsyncEventQueueMXBean(aeqID); + assertThat(mService.getLocalAsyncEventQueueMXBean(aeqID)).isNotNull(); + assertThat(aeqBean.isDispatchingPaused()).isEqualTo(false); + }); + }); + + locator.invoke(() -> GeodeAwaitility.await().untilAsserted(() -> { + AsyncEventQueueMXBean aeqBean = getAsyncEventQueueMXBean(memberServer1); + assertThat(aeqBean.isDispatchingPaused()).isEqualTo(true); + })); + + server2.invoke(() -> { + AsyncEventQueue asyncEventQueue = ClusterStartupRule.getCache().getAsyncEventQueue(aeqID); + asyncEventQueue.resumeEventDispatching(); + + ManagementService mService = + ManagementService.getManagementService(ClusterStartupRule.getCache()); + + GeodeAwaitility.await().untilAsserted(() -> { + AsyncEventQueueMXBean aeqBean = mService.getLocalAsyncEventQueueMXBean(aeqID); + assertThat(mService.getLocalAsyncEventQueueMXBean(aeqID)).isNotNull(); + assertThat(aeqBean.isDispatchingPaused()).isEqualTo(false); + }); + }); + + locator.invoke(() -> GeodeAwaitility.await().untilAsserted(() -> { + AsyncEventQueueMXBean aeqBean = getAsyncEventQueueMXBean(memberServer2); + assertThat(aeqBean.isDispatchingPaused()).isEqualTo(false); + })); + + } + + private AsyncEventQueueMXBean getAsyncEventQueueMXBean(DistributedMember memberServer1) { + SystemManagementService service = + (SystemManagementService) SystemManagementService + .getManagementService(ClusterStartupRule.getCache()); + ObjectName queueMBeanName = service.getAsyncEventQueueMBeanName(memberServer1, aeqID); + return service.getMBeanProxy(queueMBeanName, AsyncEventQueueMXBean.class); + } + + private void createAsyncEventQueueAndVerifyBeanRegisteration(String diskStoreName, + boolean dispatcherPaused) { + createAsyncEventQueue(aeqID, false, 100, 100, false, false, + diskStoreName, false, dispatcherPaused); + waitForAeqBeanToRegister(); + } + + private void destroyAsyncEventQueue() { + AsyncEventQueueImpl aeq = + (AsyncEventQueueImpl) ClusterStartupRule.getCache().getAsyncEventQueue(aeqID); + assertThat(aeq).isNotNull(); + aeq.destroy(); + } + + private void waitForProxyBeansArrival(DistributedMember member) { + GeodeAwaitility.await().untilAsserted(() -> { + SystemManagementService service = + (SystemManagementService) SystemManagementService.getManagementService( + ClusterStartupRule.getCache()); + ObjectName queueMBeanName = service.getAsyncEventQueueMBeanName(member, aeqID); + assertThat(queueMBeanName).isNotNull(); + assertThat(service.getMBeanProxy(queueMBeanName, AsyncEventQueueMXBean.class)).isNotNull(); + }); + } + + private void waitForProxyBeansRemoval(DistributedMember member) { + GeodeAwaitility.await().untilAsserted(() -> { + SystemManagementService service = + (SystemManagementService) SystemManagementService.getManagementService( + ClusterStartupRule.getCache()); + ObjectName queueMBeanName = service.getAsyncEventQueueMBeanName(member, aeqID); + assertThat(service.getMBeanProxy(queueMBeanName, AsyncEventQueueMXBean.class)).isNull(); + }); + } + + private static int getLocatorPort() { + return Locator.getLocators().get(0).getPort(); + } + + private ManagementService waitForAeqBeanToRegister() { + ManagementService mService = + ManagementService.getManagementService(ClusterStartupRule.getCache()); + + GeodeAwaitility.await().untilAsserted(() -> { + assertThat(mService.getLocalAsyncEventQueueMXBean(aeqID)).isNotNull(); + }); + + return mService; + } + + private ManagementService waitForAeqBeanToUnRegister() { + ManagementService mService = + ManagementService.getManagementService(ClusterStartupRule.getCache()); + + GeodeAwaitility.await().untilAsserted(() -> { + assertThat(mService.getLocalAsyncEventQueueMXBean(aeqID)).isNull(); + }); + + return mService; + } + + private Properties locatorProperties() { + int jmxPort = AvailablePortHelper.getRandomAvailableTCPPort(); + Properties props = new Properties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(LOG_LEVEL, "fine"); + props.setProperty(ConfigurationProperties.JMX_MANAGER_HOSTNAME_FOR_CLIENTS, "localhost"); + props.setProperty(ConfigurationProperties.JMX_MANAGER_PORT, "" + jmxPort); + + return props; + } + + public void createAsyncEventQueue(String asyncChannelId, boolean isParallel, + Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent, + String diskStoreName, boolean isDiskSynchronous, boolean dispatcherPaused) { + + if (diskStoreName != null) { + File directory = new File( + asyncChannelId + "_disk_" + System.currentTimeMillis() + "_" + VM.getCurrentVMNum()); + directory.mkdir(); + File[] dirs1 = new File[] {directory}; + DiskStoreFactory dsf = ClusterStartupRule.getCache().createDiskStoreFactory(); + dsf.setDiskDirs(dirs1); + dsf.create(diskStoreName); + } + + AsyncEventListener asyncEventListener = new MyAsyncEventListener(); + + AsyncEventQueueFactory factory = ClusterStartupRule.getCache().createAsyncEventQueueFactory(); + factory.setBatchSize(batchSize); + factory.setPersistent(isPersistent); + factory.setDiskStoreName(diskStoreName); + factory.setDiskSynchronous(isDiskSynchronous); + factory.setBatchConflationEnabled(isConflation); + factory.setMaximumQueueMemory(maxMemory); + factory.setParallel(isParallel); + factory.setDispatcherThreads(3); + if (dispatcherPaused) { + factory.pauseEventDispatching(); + } + factory.create(asyncChannelId, asyncEventListener); + } + +}
