This is an automated email from the ASF dual-hosted git repository.
jensdeppe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 6fa9bf6 GEODE-3956: Add async-event-queue-ids and gateway-sender-ids
to the R… (#1135)
6fa9bf6 is described below
commit 6fa9bf6c8a8f0c1ad8976593488be331dbae170f
Author: Jens Deppe <[email protected]>
AuthorDate: Fri Dec 8 06:20:30 2017 -0800
GEODE-3956: Add async-event-queue-ids and gateway-sender-ids to the R…
(#1135)
* GEODE-3956: Add async-event-queue-ids and gateway-sender-ids to the
RegionMBean listRegionAttributes operation
---
.../geode/management/RegionAttributesData.java | 32 +++++-
.../beans/RegionMBeanCompositeDataFactory.java | 23 ++---
.../internal/beans/RegionMBeanAttributesTest.java | 107 +++++++++++++++++++++
.../internal/beans/TestEventListener.java | 33 +++++++
.../geode/management/WANManagementDUnitTest.java | 47 ++++++---
5 files changed, 212 insertions(+), 30 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/management/RegionAttributesData.java
b/geode-core/src/main/java/org/apache/geode/management/RegionAttributesData.java
index 205546b..fc6ac08 100644
---
a/geode-core/src/main/java/org/apache/geode/management/RegionAttributesData.java
+++
b/geode-core/src/main/java/org/apache/geode/management/RegionAttributesData.java
@@ -16,6 +16,7 @@ package org.apache.geode.management;
import java.beans.ConstructorProperties;
import java.util.Arrays;
+import java.util.Set;
import org.apache.geode.cache.Region;
@@ -56,6 +57,8 @@ public class RegionAttributesData {
private boolean diskSynchronous;
private String compressorClassName;
private boolean offHeap;
+ private Set<String> eventQueueIds;
+ private Set<String> gatewaySenderIds;
/**
*
@@ -69,7 +72,7 @@ public class RegionAttributesData {
"concurrencyLevel", "indexMaintenanceSynchronous", "statisticsEnabled",
"subscriptionConflationEnabled", "asyncConflationEnabled", "poolName",
"cloningEnabled",
"diskStoreName", "interestPolicy", "diskSynchronous", "cacheListeners",
"compressorClassName",
- "offHeap"})
+ "offHeap", "eventQueueIds", "gatewaySenderIds"})
public RegionAttributesData(String cacheLoaderClassName, String
cacheWriterClassName,
@@ -81,9 +84,8 @@ public class RegionAttributesData {
boolean statisticsEnabled, boolean subscriptionConflationEnabled,
boolean asyncConflationEnabled, String poolName, boolean cloningEnabled,
String diskStoreName,
String interestPolicy, boolean diskSynchronous, String[] cacheListeners,
- String compressorClassName, boolean offHeap) {
-
-
+ String compressorClassName, boolean offHeap, Set<String> eventQueueIds,
+ Set<String> gatewaySenderIds) {
this.cacheLoaderClassName = cacheLoaderClassName;
this.cacheWriterClassName = cacheWriterClassName;
@@ -115,6 +117,8 @@ public class RegionAttributesData {
this.cacheListeners = cacheListeners;
this.compressorClassName = compressorClassName;
this.offHeap = offHeap;
+ this.eventQueueIds = eventQueueIds;
+ this.gatewaySenderIds = gatewaySenderIds;
}
/**
@@ -359,6 +363,24 @@ public class RegionAttributesData {
}
/**
+ * Returns the set of async event queue IDs.
+ *
+ * @return a set of ids.
+ */
+ public Set<String> getEventQueueIds() {
+ return eventQueueIds;
+ }
+
+ /**
+ * Returns the set of gateway sender IDs.
+ *
+ * @return a set of ids.
+ */
+ public Set<String> getGatewaySenderIds() {
+ return gatewaySenderIds;
+ }
+
+ /**
* String representation of RegionAttributesData
*/
@Override
@@ -379,7 +401,7 @@ public class RegionAttributesData {
+ regionIdleTimeout + ", regionTimeToLive=" + regionTimeToLive + ",
scope=" + scope
+ ", statisticsEnabled=" + statisticsEnabled + ",
subscriptionConflationEnabled="
+ subscriptionConflationEnabled + ", valueConstraintClassName=" +
valueConstraintClassName
- + "]";
+ + ", eventQueueIds=" + eventQueueIds + ", gatewaySenderIds=" +
gatewaySenderIds + "]";
}
diff --git
a/geode-core/src/main/java/org/apache/geode/management/internal/beans/RegionMBeanCompositeDataFactory.java
b/geode-core/src/main/java/org/apache/geode/management/internal/beans/RegionMBeanCompositeDataFactory.java
index 99c9ff1..60537bf 100644
---
a/geode-core/src/main/java/org/apache/geode/management/internal/beans/RegionMBeanCompositeDataFactory.java
+++
b/geode-core/src/main/java/org/apache/geode/management/internal/beans/RegionMBeanCompositeDataFactory.java
@@ -131,7 +131,7 @@ public class RegionMBeanCompositeDataFactory {
return fixedPartitionAttributesTable;
}
- public static RegionAttributesData getRegionAttributesData(RegionAttributes
regAttrs) {
+ public static RegionAttributesData
getRegionAttributesData(RegionAttributes<?, ?> regAttrs) {
String cacheLoaderClassName = null;
if (regAttrs.getCacheLoader() != null) {
@@ -214,16 +214,17 @@ public class RegionMBeanCompositeDataFactory {
boolean diskSynchronus = regAttrs.isDiskSynchronous();
boolean offheap = regAttrs.getOffHeap();
-
- RegionAttributesData regionAttributesData =
- new RegionAttributesData(cacheLoaderClassName, cacheWriteClassName,
keyConstraintClassName,
- valueContstraintClassName, regionTimeToLive, regionIdleTimeout,
entryTimeToLive,
- entryIdleTimeout, customEntryTimeToLive, customEntryIdleTimeout,
ignoreJTA, dataPolicy,
- scope, initialCapacity, loadFactor, lockGrantor, multicastEnabled,
concurrencyLevel,
- indexMaintenanceSynchronous, statisticsEnabled,
subsciptionConflationEnabled,
- asyncConflationEnabled, poolName, isCloningEnabled, diskStoreName,
interestPolicy,
- diskSynchronus, cacheListeners, compressorClassName, offheap);
-
+ Set<String> eventQueueIds = regAttrs.getAsyncEventQueueIds();
+ Set<String> gatewaySenderIds = regAttrs.getGatewaySenderIds();
+
+ RegionAttributesData regionAttributesData = new
RegionAttributesData(cacheLoaderClassName,
+ cacheWriteClassName, keyConstraintClassName,
valueContstraintClassName, regionTimeToLive,
+ regionIdleTimeout, entryTimeToLive, entryIdleTimeout,
customEntryTimeToLive,
+ customEntryIdleTimeout, ignoreJTA, dataPolicy, scope, initialCapacity,
loadFactor,
+ lockGrantor, multicastEnabled, concurrencyLevel,
indexMaintenanceSynchronous,
+ statisticsEnabled, subsciptionConflationEnabled,
asyncConflationEnabled, poolName,
+ isCloningEnabled, diskStoreName, interestPolicy, diskSynchronus,
cacheListeners,
+ compressorClassName, offheap, eventQueueIds, gatewaySenderIds);
return regionAttributesData;
}
diff --git
a/geode-core/src/test/java/org/apache/geode/management/internal/beans/RegionMBeanAttributesTest.java
b/geode-core/src/test/java/org/apache/geode/management/internal/beans/RegionMBeanAttributesTest.java
new file mode 100644
index 0000000..af86a14
--- /dev/null
+++
b/geode-core/src/test/java/org/apache/geode/management/internal/beans/RegionMBeanAttributesTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.management.internal.beans;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.contrib.java.lang.system.RestoreSystemProperties;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.management.RegionMXBean;
+import org.apache.geode.test.junit.categories.IntegrationTest;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+import org.apache.geode.test.junit.rules.MBeanServerConnectionRule;
+import org.apache.geode.test.junit.rules.ServerStarterRule;
+
+@Category(IntegrationTest.class)
+public class RegionMBeanAttributesTest {
+
+ private RegionMXBean bean;
+
+ @Rule
+ public RestoreSystemProperties restoreSystemProperties = new
RestoreSystemProperties();
+
+ @Rule
+ public GfshCommandRule gfsh = new GfshCommandRule();
+
+ @Rule // do not use a ClassRule since some test will do a shutdownMember
+ public ServerStarterRule server = new ServerStarterRule().withJMXManager()
+ .withRegion(RegionShortcut.REPLICATE, "FOO").withAutoStart();
+
+ @Rule
+ public MBeanServerConnectionRule mBeanRule = new MBeanServerConnectionRule();
+
+ @Before
+ public void setUp() throws Exception {
+ gfsh.connectAndVerify(server.getJmxPort(),
GfshCommandRule.PortType.jmxManager);
+ mBeanRule.connect(server.getJmxPort());
+ }
+
+ @Test
+ public void regionMBeanContainsEventQueueId() throws Exception {
+ gfsh.executeAndAssertThat(
+ "create async-event-queue --id=AEQ1 --listener=" +
TestEventListener.class.getName())
+ .statusIsSuccess();
+ gfsh.executeAndAssertThat("alter region --name=FOO
--async-event-queue-id=AEQ1")
+ .statusIsSuccess();
+
+ bean = mBeanRule.getProxyMBean(RegionMXBean.class);
+
+ assertThat(bean).isNotNull();
+ Set<String> eventQueueIds = bean.listRegionAttributes().getEventQueueIds();
+ assertThat(eventQueueIds).containsExactly("AEQ1");
+ }
+
+ @Test
+ public void removingEventQueueAlsoRemovesFromMBean() throws Exception {
+ gfsh.executeAndAssertThat(
+ "create async-event-queue --id=AEQ1 --listener=" +
TestEventListener.class.getName())
+ .statusIsSuccess();
+ gfsh.executeAndAssertThat("alter region --name=FOO
--async-event-queue-id=AEQ1")
+ .statusIsSuccess();
+
+ bean = mBeanRule.getProxyMBean(RegionMXBean.class);
+
+ assertThat(bean).isNotNull();
+ Set<String> eventQueueIds = bean.listRegionAttributes().getEventQueueIds();
+ assertThat(eventQueueIds).containsExactly("AEQ1");
+
+ gfsh.executeAndAssertThat("alter region --name=/FOO
--async-event-queue-id=").statusIsSuccess();
+
+ eventQueueIds = bean.listRegionAttributes().getEventQueueIds();
+ assertThat(eventQueueIds).containsExactly("");
+ }
+
+ @Test
+ public void regionMBeanContainsGatewaySenderId() throws Exception {
+ gfsh.executeAndAssertThat("create gateway-sender --id=SENDER1
--remote-distributed-system-id=1")
+ .statusIsSuccess();
+
+ server.waitTilGatewaySendersAreReady(1);
+
+ bean = mBeanRule.getProxyMBean(RegionMXBean.class);
+
+ assertThat(bean).isNotNull();
+ Set<String> gatewaySenderIds =
bean.listRegionAttributes().getGatewaySenderIds();
+ assertThat(gatewaySenderIds).containsExactly("");
+ }
+}
diff --git
a/geode-core/src/test/java/org/apache/geode/management/internal/beans/TestEventListener.java
b/geode-core/src/test/java/org/apache/geode/management/internal/beans/TestEventListener.java
new file mode 100644
index 0000000..4f85ef0
--- /dev/null
+++
b/geode-core/src/test/java/org/apache/geode/management/internal/beans/TestEventListener.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.management.internal.beans;
+
+import java.util.List;
+
+import org.apache.geode.cache.asyncqueue.AsyncEvent;
+import org.apache.geode.cache.asyncqueue.AsyncEventListener;
+
+public class TestEventListener implements AsyncEventListener {
+ @Override
+ public boolean processEvents(List<AsyncEvent> events) {
+ return true;
+ }
+
+ @Override
+ public void close() {
+
+ }
+}
diff --git
a/geode-wan/src/test/java/org/apache/geode/management/WANManagementDUnitTest.java
b/geode-wan/src/test/java/org/apache/geode/management/WANManagementDUnitTest.java
index 1e40b0b..24c782d 100644
---
a/geode-wan/src/test/java/org/apache/geode/management/WANManagementDUnitTest.java
+++
b/geode-wan/src/test/java/org/apache/geode/management/WANManagementDUnitTest.java
@@ -14,6 +14,7 @@
*/
package org.apache.geode.management;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -59,17 +60,7 @@ public class WANManagementDUnitTest extends
ManagementTestBase {
}
@Test
- public void testMBeanCallbackSerial() throws Exception {
- testMBeanCallback(false);
- }
-
- @Test
- public void testMBeanCallbackParallel() throws Exception {
- testMBeanCallback(true);
-
- }
-
- public void testMBeanCallback(boolean parallel) throws Exception {
+ public void testMBeanCallback() throws Exception {
VM nyLocator = getManagedNodeList().get(0);
VM nyReceiver = getManagedNodeList().get(1);
@@ -81,13 +72,10 @@ public class WANManagementDUnitTest extends
ManagementTestBase {
Integer nyPort = nyLocator.invoke(() ->
WANTestBase.createFirstRemoteLocator(12, dsIdPort));
-
-
puneSender.invoke(() -> WANTestBase.createCache(dsIdPort));
managing.invoke(() -> WANTestBase.createManagementCache(dsIdPort));
startManagingNode(managing);
-
// keep a larger batch to minimize number of exception occurrences in the
// log
puneSender
@@ -255,6 +243,37 @@ public class WANManagementDUnitTest extends
ManagementTestBase {
checkProxyAsyncQueue(managerVm, member, false);
}
+ @Test
+ public void testDistributedRegionMBeanHasGatewaySenderIds() {
+ VM locator = Host.getLocator();
+ VM managing = getManagingNode();
+ VM sender = getManagedNodeList().get(0);
+
+ int dsIdPort = locator.invoke(() ->
WANManagementDUnitTest.getLocatorPort());
+
+ sender.invoke(() -> WANTestBase.createCache(dsIdPort));
+ managing.invoke(() -> WANTestBase.createManagementCache(dsIdPort));
+ startManagingNode(managing);
+
+ sender
+ .invoke(() -> WANTestBase.createSender("pn", 12, true, 100, 300,
false, false, null, true));
+
+ String regionName = getTestMethodName() + "_PR";
+ sender.invoke(() -> WANTestBase.createPartitionedRegion(regionName, "pn",
0, 13, false));
+
+ String regionPath = "/" + regionName;
+ managing.invoke(() -> {
+ Cache cache = GemFireCacheImpl.getInstance();
+ ManagementService service =
ManagementService.getManagementService(cache);
+
+ Awaitility.await().atMost(5, TimeUnit.SECONDS)
+ .until(() ->
assertNotNull(service.getDistributedRegionMXBean(regionPath)));
+
+ DistributedRegionMXBean bean =
service.getDistributedRegionMXBean(regionPath);
+
assertThat(bean.listRegionAttributes().getGatewaySenderIds()).containsExactly("pn");
+ });
+ }
+
@SuppressWarnings("serial")
protected void checkSenderNavigationAPIS(final VM vm, final
DistributedMember senderMember) {
SerializableRunnable checkNavigationAPIS =
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].