This is an automated email from the ASF dual-hosted git repository.
khowe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 29c71c9 GEODE-5281: replicate failure and fix bug (#2096)
29c71c9 is described below
commit 29c71c96f2933e8380e412e9d14b42beb30ae96b
Author: Helena Bales <[email protected]>
AuthorDate: Fri Jul 6 08:05:35 2018 -0700
GEODE-5281: replicate failure and fix bug (#2096)
* GEODE-5281: replicate failure and fix bug
- replicate the bad timing using a DUnit test
- fix the bug by only doing one lookup of MBeans
- add more tests
- fix potential NPE in waiting for regions to be ready on servers
Co-authored-by: Kenneth Howe <[email protected]>
* add TestingOnly annotation to setter
add a TestingOnly annotation to setter for ProxyFactory in the
FederatingManager for injecting test spies.
---
.../management/internal/FederatingManager.java | 5 +
.../geode/management/internal/MBeanJMXAdapter.java | 5 +
.../management/internal/MBeanProxyFactory.java | 15 +-
.../internal/MBeanProxyInvocationHandler.java | 6 +-
.../internal/JMXMBeanFederationDUnitTest.java | 189 +++++++++++++++++++++
.../MBeanFederationErrorPathDUnitTest.java | 128 ++++++++++++++
.../geode/test/junit/rules/MemberStarterRule.java | 3 +-
7 files changed, 339 insertions(+), 12 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManager.java
b/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManager.java
index c32f32a..fde2cd0 100755
---
a/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManager.java
+++
b/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManager.java
@@ -100,6 +100,11 @@ public class FederatingManager extends Manager {
this.messenger = new MemberMessenger(jmxAdapter, repo, system);
}
+ @TestingOnly
+ void setProxyFactory(MBeanProxyFactory newProxyFactory) {
+ this.proxyFactory = newProxyFactory;
+ }
+
/**
* This method will be invoked whenever a member wants to be a managing
node. The exception
* Management exception has to be handled by the caller.
diff --git
a/geode-core/src/main/java/org/apache/geode/management/internal/MBeanJMXAdapter.java
b/geode-core/src/main/java/org/apache/geode/management/internal/MBeanJMXAdapter.java
index c6e4e2f..42a1050 100644
---
a/geode-core/src/main/java/org/apache/geode/management/internal/MBeanJMXAdapter.java
+++
b/geode-core/src/main/java/org/apache/geode/management/internal/MBeanJMXAdapter.java
@@ -97,6 +97,11 @@ public class MBeanJMXAdapter implements ManagementConstants {
newObjectName = ObjectName.getInstance(
OBJECTNAME__PREFIX + objectKeyProperty + KEYVAL_SEPARATOR +
"member=" + member);
}
+
+ if (isRegistered(newObjectName)) {
+ return newObjectName;
+ }
+
mbeanServer.registerMBean(object, newObjectName);
this.localGemFireMBean.put(newObjectName, object);
diff --git
a/geode-core/src/main/java/org/apache/geode/management/internal/MBeanProxyFactory.java
b/geode-core/src/main/java/org/apache/geode/management/internal/MBeanProxyFactory.java
index da8ce09..212ac65 100644
---
a/geode-core/src/main/java/org/apache/geode/management/internal/MBeanProxyFactory.java
+++
b/geode-core/src/main/java/org/apache/geode/management/internal/MBeanProxyFactory.java
@@ -16,6 +16,7 @@ package org.apache.geode.management.internal;
import java.beans.IntrospectionException;
import java.util.Iterator;
+import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
@@ -78,14 +79,13 @@ public class MBeanProxyFactory {
Region<String, Object> monitoringRegion, Object newVal) {
try {
- String name = objectName.toString();
- FederationComponent federationComponent = (FederationComponent)
monitoringRegion.get(name);
+ FederationComponent federationComponent = (FederationComponent) newVal;
String interfaceClassName = federationComponent.getMBeanInterfaceClass();
Class interfaceClass = ClassLoadUtil.classFromName(interfaceClassName);
Object object = MBeanProxyInvocationHandler.newProxyInstance(member,
monitoringRegion,
- objectName, interfaceClass);
+ objectName, federationComponent, interfaceClass);
jmxAdapter.registerMBeanProxy(object, objectName);
@@ -123,19 +123,18 @@ public class MBeanProxyFactory {
logger.debug("Creating proxy for: {}", member.getId());
}
- Set<String> mbeanNames = monitoringRegion.keySet();
+ Set<Map.Entry<String, Object>> mbeans = monitoringRegion.entrySet();
- for (String mbeanName : mbeanNames) {
+ for (Map.Entry<String, Object> mbean : mbeans) {
ObjectName objectName = null;
try {
- objectName = ObjectName.getInstance(mbeanName);
+ objectName = ObjectName.getInstance(mbean.getKey());
if (logger.isDebugEnabled()) {
logger.debug("Creating proxy for ObjectName: " +
objectName.toString());
}
- createProxy(member, objectName, monitoringRegion,
- monitoringRegion.get(objectName.toString()));
+ createProxy(member, objectName, monitoringRegion, mbean.getValue());
} catch (Exception e) {
logger.warn("Create Proxy failed for {} with exception {}",
objectName, e.getMessage(), e);
}
diff --git
a/geode-core/src/main/java/org/apache/geode/management/internal/MBeanProxyInvocationHandler.java
b/geode-core/src/main/java/org/apache/geode/management/internal/MBeanProxyInvocationHandler.java
index 6c79e52..7855fff 100644
---
a/geode-core/src/main/java/org/apache/geode/management/internal/MBeanProxyInvocationHandler.java
+++
b/geode-core/src/main/java/org/apache/geode/management/internal/MBeanProxyInvocationHandler.java
@@ -94,11 +94,11 @@ public class MBeanProxyInvocationHandler implements
InvocationHandler {
* @param interfaceClass on which interface the proxy to be exposed
*/
public static Object newProxyInstance(DistributedMember member,
- Region<String, Object> monitoringRegion, ObjectName objectName, Class
interfaceClass)
+ Region<String, Object> monitoringRegion, ObjectName objectName,
+ FederationComponent federationComponent, Class interfaceClass)
throws ClassNotFoundException, IntrospectionException {
boolean isMXBean = JMX.isMXBeanInterface(interfaceClass);
- boolean notificationBroadcaster =
- ((FederationComponent)
monitoringRegion.get(objectName.toString())).isNotificationEmitter();
+ boolean notificationBroadcaster =
federationComponent.isNotificationEmitter();
InvocationHandler handler =
new MBeanProxyInvocationHandler(member, objectName, monitoringRegion,
isMXBean);
diff --git
a/geode-core/src/test/java/org/apache/geode/management/internal/JMXMBeanFederationDUnitTest.java
b/geode-core/src/test/java/org/apache/geode/management/internal/JMXMBeanFederationDUnitTest.java
new file mode 100644
index 0000000..e08975d
--- /dev/null
+++
b/geode-core/src/test/java/org/apache/geode/management/internal/JMXMBeanFederationDUnitTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal;
+
+import static java.util.stream.Collectors.toList;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.test.dunit.internal.InternalBlackboard;
+import org.apache.geode.test.dunit.internal.InternalBlackboardImpl;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.DistributedTest;
+import org.apache.geode.test.junit.categories.JMXTest;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+import org.apache.geode.test.junit.rules.MBeanServerConnectionRule;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+
+@Category({DistributedTest.class, JMXTest.class})
+public class JMXMBeanFederationDUnitTest {
+ private static final String LOCATOR_1_NAME = "locator-one";
+ private static final String LOCATOR_2_NAME = "locator-two";
+ private static final String REGION_PATH = "/test-region-1";
+ private static final int LOCATOR_1_VM_INDEX = 0;
+ private static final int LOCATOR_2_VM_INDEX = 4;
+ private static final int LOCATOR_COUNT = 1;
+ private static final int SERVER_1_VM_INDEX = 1;
+ private static final int SERVER_2_VM_INDEX = 2;
+ private static final int SERVER_3_VM_INDEX = 3;
+ private static int SERVER_COUNT = 2;
+
+ private int locator1JmxPort;
+ private int locator2JmxPort;
+
+ private MemberVM locator1, locator2, server1, server2, server3;
+
+ private InternalBlackboard bb;
+
+ @Rule
+ public ClusterStartupRule lsRule = new ClusterStartupRule();
+
+ @Rule
+ public GfshCommandRule gfsh = new GfshCommandRule();
+
+ @Rule
+ public MBeanServerConnectionRule jmxConnectionRule = new
MBeanServerConnectionRule();
+
+ @Rule
+ public SerializableTestName testName = new SerializableTestName();
+
+ @Before
+ public void before() throws Exception {
+ locator1JmxPort =
AvailablePortHelper.getRandomAvailableTCPPorts(LOCATOR_COUNT)[0];
+ locator1 = lsRule.startLocatorVM(LOCATOR_1_VM_INDEX, locator1Properties());
+
+ server1 = lsRule.startServerVM(SERVER_1_VM_INDEX, locator1.getPort());
+ server2 = lsRule.startServerVM(SERVER_2_VM_INDEX, locator1.getPort());
+
+ gfsh.connectAndVerify(locator1);
+ gfsh.executeAndAssertThat(
+ "create region --type=REPLICATE --name=" + REGION_PATH + "
--enable-statistics=true")
+ .statusIsSuccess();
+ gfsh.disconnect();
+
+ locator1.waitTillRegionsAreReadyOnServers(REGION_PATH, SERVER_COUNT);
+
+ bb = InternalBlackboardImpl.getInstance();
+ }
+
+ @Test
+ public void MBeanFederationAddRemoveServer() throws IOException {
+ List<String> initialMBeans = getFederatedGemfireBeansFrom(locator1);
+
+ server3 = lsRule.startServerVM(SERVER_3_VM_INDEX, locator1.getPort());
+ SERVER_COUNT++;
+ locator1.waitTillRegionsAreReadyOnServers(REGION_PATH, SERVER_COUNT);
+ List keyset = server3.invoke(() -> {
+ InternalCache cache = ClusterStartupRule.getCache();
+ DistributedMember member =
+
InternalDistributedSystem.getConnectedInstance().getDistributedMember();
+ String appender = MBeanJMXAdapter.getUniqueIDForMember(member);
+ Region monitoringRegion =
+ cache.getRegion(ManagementConstants.MONITORING_REGION + "_" +
appender);
+ List l = (List<String>)
monitoringRegion.keySet().stream().collect(Collectors.toList());
+ return l;
+ });
+
+ List<String> intermediateMBeans = getFederatedGemfireBeansFrom(locator1);
+ List<String> expectedMBeans = new ArrayList<>();
+ expectedMBeans.addAll(initialMBeans);
+ expectedMBeans.addAll(keyset);
+ expectedMBeans =
expectedMBeans.stream().sorted().collect(Collectors.toList());
+ intermediateMBeans =
intermediateMBeans.stream().sorted().collect(Collectors.toList());
+ assertThat(intermediateMBeans).containsExactlyElementsOf(expectedMBeans);
+
+ lsRule.stopMember(SERVER_3_VM_INDEX);
+ SERVER_COUNT--;
+ locator1.waitTillRegionsAreReadyOnServers(REGION_PATH, SERVER_COUNT);
+
+ List<String> finalMBeans = getFederatedGemfireBeansFrom(locator1);
+
+ assertThat(finalMBeans).containsExactlyElementsOf(initialMBeans);
+ }
+
+ private static List<String> getFederatedGemfireBeansFrom(MemberVM member)
+ throws IOException {
+ String url = jmxBeanLocalhostUrlString(member.getJmxPort());
+ MBeanServerConnection remoteMBS = connectToMBeanServer(url);
+ Set<ObjectName> allBeanNames = remoteMBS.queryNames(null, null);
+ // Each locator will have a "Manager" bean that is a part of the above
query,
+ // representing the ManagementAdapter.
+ // This bean is registered (and so included in its own queries),
+ // but *not* federated (and so is not included in another locator's bean
queries).
+ // For the scope of this test, we do not consider these "service=Manager"
beans.
+ Set<String> allBeans = new HashSet<>();
+ for (ObjectName bean : allBeanNames) {
+ allBeans.add(bean.toString());
+ }
+
+ return allBeans.stream()
+ .filter(b -> b.contains("GemFire"))
+ .sorted()
+ .collect(toList());
+ }
+
+ private static MBeanServerConnection connectToMBeanServer(String url) throws
IOException {
+ final JMXServiceURL serviceURL = new JMXServiceURL(url);
+ JMXConnector conn = JMXConnectorFactory.connect(serviceURL);
+ return conn.getMBeanServerConnection();
+ }
+
+ private static String jmxBeanLocalhostUrlString(int port) {
+ return "service:jmx:rmi:///jndi/rmi://localhost"
+ + ":" + port + "/jmxrmi";
+ }
+
+ private Properties locator1Properties() {
+ Properties props = new Properties();
+
props.setProperty(ConfigurationProperties.JMX_MANAGER_HOSTNAME_FOR_CLIENTS,
"localhost");
+ props.setProperty(ConfigurationProperties.JMX_MANAGER_PORT, "" +
locator1JmxPort);
+ props.setProperty(ConfigurationProperties.NAME, LOCATOR_1_NAME);
+ return props;
+ }
+
+ private Properties locator2Properties() {
+ locator2JmxPort =
AvailablePortHelper.getRandomAvailableTCPPorts(LOCATOR_COUNT)[0];
+ Properties props = new Properties();
+
props.setProperty(ConfigurationProperties.JMX_MANAGER_HOSTNAME_FOR_CLIENTS,
"localhost");
+ props.setProperty(ConfigurationProperties.JMX_MANAGER_PORT, "" +
locator2JmxPort);
+ props.setProperty(ConfigurationProperties.NAME, LOCATOR_2_NAME);
+ return props;
+ }
+}
diff --git
a/geode-core/src/test/java/org/apache/geode/management/internal/MBeanFederationErrorPathDUnitTest.java
b/geode-core/src/test/java/org/apache/geode/management/internal/MBeanFederationErrorPathDUnitTest.java
new file mode 100644
index 0000000..26e083e
--- /dev/null
+++
b/geode-core/src/test/java/org/apache/geode/management/internal/MBeanFederationErrorPathDUnitTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+
+import java.rmi.RemoteException;
+import java.util.concurrent.TimeUnit;
+
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.awaitility.Awaitility;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.management.ManagementService;
+import org.apache.geode.test.dunit.internal.InternalBlackboard;
+import org.apache.geode.test.dunit.internal.InternalBlackboardImpl;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.DistributedTest;
+import org.apache.geode.test.junit.categories.JMXTest;
+import org.apache.geode.test.junit.rules.LocatorStarterRule;
+
+@Category({DistributedTest.class, JMXTest.class})
+public class MBeanFederationErrorPathDUnitTest {
+ private static final int SERVER_1_VM_INDEX = 1;
+ private static final String REGION_NAME = "test-region-1";
+
+ public MemberVM server1, server2, server3;
+
+ @Rule
+ public LocatorStarterRule locator1 = new LocatorStarterRule();
+
+ @Rule
+ public ClusterStartupRule lsRule = new ClusterStartupRule();
+
+
+ private InternalBlackboard bb;
+
+ @Before
+ public void before() throws Exception {
+ locator1.withJMXManager().startLocator();
+
+ bb = InternalBlackboardImpl.getInstance();
+ }
+
+ @Test
+ public void destroyMBeanBeforeFederationCompletes()
+ throws MalformedObjectNameException, RemoteException {
+ String bbKey = "sync1";
+
+ String beanName =
"GemFire:service=Region,name=\"/test-region-1\",type=Member,member=server-1";
+ ObjectName objectName = new ObjectName(beanName);
+
+ InternalCache cache = locator1.getCache();
+ SystemManagementService service =
+ (SystemManagementService)
ManagementService.getManagementService(cache);
+ FederatingManager federatingManager = service.getFederatingManager();
+ MBeanProxyFactory mBeanProxyFactory = federatingManager.getProxyFactory();
+ MBeanProxyFactory spy = spy(mBeanProxyFactory);
+ service.getFederatingManager().setProxyFactory(spy);
+
+ Answer answer1 = new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ server1.invoke(() -> {
+ InternalCache serverCache = ClusterStartupRule.getCache();
+ Region region = serverCache.getRegionByPath("/" + REGION_NAME);
+ region.destroyRegion();
+ });
+
+ Region<String, Object> monitoringRegion = invocation.getArgument(2);
+ monitoringRegion.destroy(objectName.toString());
+
+ assertThat((monitoringRegion).get(objectName.toString())).isNull();
+
+ try {
+ invocation.callRealMethod();
+ } catch (Exception e) {
+ bb.setMailbox(bbKey, e);
+ return null;
+ }
+ bb.setMailbox(bbKey, "this is fine");
+ return null;
+ }
+ };
+
+ doAnswer(answer1).when(spy).createProxy(any(), eq(objectName), any(),
any());
+
+ server1 = lsRule.startServerVM(SERVER_1_VM_INDEX, locator1.getPort());
+
+ server1.invoke(() -> {
+ InternalCache cache1 = ClusterStartupRule.getCache();
+ cache1.createRegionFactory(RegionShortcut.REPLICATE).create(REGION_NAME);
+ });
+
+ Awaitility.waitAtMost(10, TimeUnit.SECONDS).until(() ->
bb.getMailbox(bbKey) != null);
+ Object e = bb.getMailbox("sync1");
+
+ assertThat(e).isNotInstanceOf(NullPointerException.class);
+ assertThat((String) e).contains("this is fine");
+ }
+}
diff --git
a/geode-core/src/test/java/org/apache/geode/test/junit/rules/MemberStarterRule.java
b/geode-core/src/test/java/org/apache/geode/test/junit/rules/MemberStarterRule.java
index b42cba0..684f94b 100644
---
a/geode-core/src/test/java/org/apache/geode/test/junit/rules/MemberStarterRule.java
+++
b/geode-core/src/test/java/org/apache/geode/test/junit/rules/MemberStarterRule.java
@@ -264,7 +264,8 @@ public abstract class MemberStarterRule<T> extends
SerializableExternalResource
public void waitTillRegionIsReadyOnServers(String regionName, int
serverCount) {
await().atMost(30, TimeUnit.SECONDS).until(() ->
getRegionMBean(regionName) != null);
await().atMost(30, TimeUnit.SECONDS)
- .until(() -> getRegionMBean(regionName).getMembers().length ==
serverCount);
+ .until(() -> getRegionMBean(regionName).getMembers() != null
+ && getRegionMBean(regionName).getMembers().length == serverCount);
}
private long getDiskStoreCount(String diskStoreName) {