This is an automated email from the ASF dual-hosted git repository.
jonmeredith pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.11 by this push:
new c6d7d070c5 IsolatedJMX should not release all TCPEndpoints on instance
shutdown
c6d7d070c5 is described below
commit c6d7d070c59d81db8949683d3e5670b909efb48c
Author: Doug Rohrer <[email protected]>
AuthorDate: Wed Aug 23 18:03:30 2023 -0600
IsolatedJMX should not release all TCPEndpoints on instance shutdown
patch by Doug Rohrer; reviewed by Jon Meredith, Stefan Miklosovic for
CASSANDRA-18725
---
build.xml | 2 +-
.../utils/RMIClientSocketFactoryImpl.java | 23 ++-
.../apache/cassandra/utils/ReflectionUtils.java | 101 +++++++++
.../cassandra/distributed/impl/Instance.java | 5 +-
.../cassandra/distributed/impl/IsolatedJmx.java | 95 ++++-----
.../cassandra/distributed/shared/ClusterUtils.java | 226 +++++++++++++++++++++
.../distributed/shared/WithProperties.java | 112 ++++++++++
.../distributed/test/ResourceLeakTest.java | 54 ++---
.../distributed/test/jmx/JMXFeatureTest.java | 57 +++++-
.../distributed/test/jmx/JMXGetterCheckTest.java | 48 +++--
10 files changed, 627 insertions(+), 96 deletions(-)
diff --git a/build.xml b/build.xml
index c634761566..70a195f98d 100644
--- a/build.xml
+++ b/build.xml
@@ -383,7 +383,7 @@
<exclusion groupId="org.hamcrest" artifactId="hamcrest-core"/>
</dependency>
<dependency groupId="org.mockito" artifactId="mockito-core"
version="3.2.4" scope="test"/>
- <dependency groupId="org.apache.cassandra" artifactId="dtest-api"
version="0.0.15" scope="test"/>
+ <dependency groupId="org.apache.cassandra" artifactId="dtest-api"
version="0.0.16" scope="test"/>
<dependency groupId="org.reflections" artifactId="reflections"
version="0.10.2" scope="test"/>
<dependency groupId="org.quicktheories" artifactId="quicktheories"
version="0.25" scope="test"/>
<dependency groupId="org.apache.hadoop" artifactId="hadoop-core"
version="1.0.3" scope="provided">
diff --git
a/src/java/org/apache/cassandra/utils/RMIClientSocketFactoryImpl.java
b/src/java/org/apache/cassandra/utils/RMIClientSocketFactoryImpl.java
index 62ab88fb92..c04e934fdc 100644
--- a/src/java/org/apache/cassandra/utils/RMIClientSocketFactoryImpl.java
+++ b/src/java/org/apache/cassandra/utils/RMIClientSocketFactoryImpl.java
@@ -21,8 +21,11 @@ package org.apache.cassandra.utils;
import java.io.IOException;
import java.io.Serializable;
import java.net.InetAddress;
+import java.net.ServerSocket;
import java.net.Socket;
import java.rmi.server.RMIClientSocketFactory;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Objects;
/**
@@ -32,6 +35,7 @@ import java.util.Objects;
*/
public class RMIClientSocketFactoryImpl implements RMIClientSocketFactory,
Serializable
{
+ List<Socket> sockets = new ArrayList<>();
private final InetAddress localAddress;
public RMIClientSocketFactoryImpl(InetAddress localAddress)
@@ -42,7 +46,24 @@ public class RMIClientSocketFactoryImpl implements
RMIClientSocketFactory, Seria
@Override
public Socket createSocket(String host, int port) throws IOException
{
- return new Socket(localAddress, port);
+ Socket socket = new Socket(localAddress, port);
+ sockets.add(socket);
+ return socket;
+ }
+
+ public void close() throws IOException
+ {
+ for (Socket socket: sockets)
+ {
+ try
+ {
+ socket.close();
+ }
+ catch (IOException ignored)
+ {
+ // intentionally ignored
+ }
+ }
}
@Override
diff --git a/src/java/org/apache/cassandra/utils/ReflectionUtils.java
b/src/java/org/apache/cassandra/utils/ReflectionUtils.java
new file mode 100644
index 0000000000..3d1cc29565
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/ReflectionUtils.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.function.Predicate;
+
+public class ReflectionUtils
+{
+ public static Field getField(Class<?> clazz, String fieldName) throws
NoSuchFieldException
+ {
+ // below code works before Java 12
+ try
+ {
+ return clazz.getDeclaredField(fieldName);
+ }
+ catch (NoSuchFieldException e)
+ {
+ // this is mitigation for JDK 17
(https://bugs.openjdk.org/browse/JDK-8210522)
+ try
+ {
+ Method getDeclaredFields0 =
Class.class.getDeclaredMethod("getDeclaredFields0", boolean.class);
+ getDeclaredFields0.setAccessible(true);
+ Field[] fields = (Field[]) getDeclaredFields0.invoke(clazz,
false);
+ for (Field field : fields)
+ {
+ if (fieldName.equals(field.getName()))
+ {
+ return field;
+ }
+ }
+ }
+ catch (ReflectiveOperationException ex)
+ {
+ e.addSuppressed(ex);
+ }
+ throw e;
+ }
+ }
+
+ /**
+ * Used by the in-jvm dtest framework to remove entries from private map
fields that otherwise would prevent
+ * collection of classloaders (which causes metaspace OOMs) or otherwise
interfere with instance restart.
+ *
+ * @param clazz The class which has the map field to clear
+ * @param instance an instance of the class to clear (pass null for a
static member)
+ * @param mapName the name of the map field to clear
+ * @param shouldRemove a predicate which determines if the entry in
question should be removed
+ * @param <K> The type of the map key
+ * @param <V> The type of the map value
+ */
+ public static <K, V> void clearMapField(Class<?> clazz, Object instance,
String mapName, Predicate<Map.Entry<K, V>> shouldRemove)
+ {
+ try
+ {
+ Field mapField = getField(clazz, mapName);
+ mapField.setAccessible(true);
+ // noinspection unchecked
+ Map<K, V> map = (Map<K, V>) mapField.get(instance);
+ // Because multiple instances can be shutting down at once,
+ // synchronize on the map to avoid ConcurrentModificationException
+ synchronized (map)
+ {
+ // This could be done with a simple `map.entrySet.removeIf()`
call
+ // but for debugging purposes it is much easier to keep it
like this.
+ Iterator<Map.Entry<K, V>> it = map.entrySet().iterator();
+ while (it.hasNext())
+ {
+ Map.Entry<K, V> entry = it.next();
+ if (shouldRemove.test(entry))
+ {
+ it.remove();
+ }
+ }
+ }
+ }
+ catch (NoSuchFieldException | IllegalAccessException ex)
+ {
+ throw new RuntimeException(String.format("Could not clear map
field %s in class %s", mapName, clazz), ex);
+ }
+ }
+}
diff --git
a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index cca6296629..bbf98ca022 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -614,17 +614,18 @@ public class Instance extends IsolatedExecutor implements
IInvokableInstance
initialized = true;
}
- private void startJmx()
+ private synchronized void startJmx()
{
isolatedJmx = new IsolatedJmx(this, inInstancelogger);
isolatedJmx.startJmx();
}
- private void stopJmx() throws IllegalAccessException,
NoSuchFieldException, InterruptedException
+ private synchronized void stopJmx()
{
if (config.has(JMX))
{
isolatedJmx.stopJmx();
+ isolatedJmx = null;
}
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/impl/IsolatedJmx.java
b/test/distributed/org/apache/cassandra/distributed/impl/IsolatedJmx.java
index b3d06590a9..e586cae58d 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/IsolatedJmx.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/IsolatedJmx.java
@@ -18,14 +18,13 @@
package org.apache.cassandra.distributed.impl;
-import java.lang.reflect.Field;
+import java.io.IOException;
import java.net.InetAddress;
-import java.net.MalformedURLException;
import java.util.HashMap;
-import java.util.Iterator;
+import java.util.LinkedList;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import javax.management.remote.JMXConnector;
-import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXConnectorServer;
import javax.management.remote.JMXServiceURL;
import javax.management.remote.rmi.RMIConnectorServer;
@@ -34,6 +33,8 @@ import javax.management.remote.rmi.RMIJRMPServerImpl;
import org.slf4j.Logger;
import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.shared.JMXUtil;
+import org.apache.cassandra.distributed.shared.Uninterruptibles;
import org.apache.cassandra.utils.JMXServerUtils;
import org.apache.cassandra.utils.MBeanWrapper;
import org.apache.cassandra.utils.RMIClientSocketFactoryImpl;
@@ -41,6 +42,7 @@ import sun.rmi.transport.tcp.TCPEndpoint;
import static org.apache.cassandra.distributed.api.Feature.JMX;
import static
org.apache.cassandra.utils.MBeanWrapper.IS_DISABLED_MBEAN_REGISTRATION;
+import static org.apache.cassandra.utils.ReflectionUtils.clearMapField;
public class IsolatedJmx
{
@@ -48,7 +50,8 @@ public class IsolatedJmx
private static final String SUN_RMI_TRANSPORT_TCP_THREADKEEPALIVETIME =
"sun.rmi.transport.tcp.threadKeepAliveTime";
/** Controls the distributed garbage collector lease time for JMX objects.
*/
private static final String JAVA_RMI_DGC_LEASE_VALUE_IN_JVM_DTEST
="java.rmi.dgc.leaseValue";
- private static final int RMI_KEEPALIVE_TIME = 1000;
+ public static final int RMI_KEEPALIVE_TIME = 1000;
+ public static final String UNKNOWN_JMX_CONNECTION_ERROR = "Could not
connect to JMX due to an unknown error";
private JMXConnectorServer jmxConnectorServer;
private JMXServerUtils.JmxRegistry registry;
@@ -59,7 +62,8 @@ public class IsolatedJmx
private Logger inInstancelogger;
private IInstanceConfig config;
- public IsolatedJmx(Instance instance, Logger inInstancelogger) {
+ public IsolatedJmx(Instance instance, Logger inInstancelogger)
+ {
this.inInstancelogger = inInstancelogger;
config = instance.config();
}
@@ -127,45 +131,29 @@ public class IsolatedJmx
registry.setRemoteServerStub(jmxRmiServer.toStub());
JMXServerUtils.logJmxServiceUrl(addr, jmxPort);
- waitForJmxAvailability(hostname, jmxPort, env);
+ waitForJmxAvailability(env);
}
- catch (Throwable e)
+ catch (Throwable t)
{
- throw new RuntimeException("Feature.JMX was enabled but could not
be started.", e);
+ throw new RuntimeException("Feature.JMX was enabled but could not
be started.", t);
}
}
-
- private void waitForJmxAvailability(String hostname, int rmiPort,
Map<String, Object> env) throws InterruptedException, MalformedURLException
+ private void waitForJmxAvailability(Map<String, ?> env)
{
- String url =
String.format("service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi", hostname, rmiPort);
- JMXServiceURL serviceURL = new JMXServiceURL(url);
- int attempts = 0;
- Throwable lastThrown = null;
- while (attempts < 20)
+ try (JMXConnector ignored = JMXUtil.getJmxConnector(config, 20, env))
{
- attempts++;
- try (JMXConnector ignored =
JMXConnectorFactory.connect(serviceURL, env))
- {
- inInstancelogger.info("Connected to JMX server at {} after {}
attempt(s)",
- url, attempts);
- return;
- }
- catch (MalformedURLException e)
- {
- throw new RuntimeException(e);
- }
- catch (Throwable thrown)
- {
- lastThrown = thrown;
- }
- inInstancelogger.info("Could not connect to JMX on {} after {}
attempts. Will retry.", url, attempts);
- Thread.sleep(1000);
+ // Do nothing - JMXUtil now retries
+ }
+ catch (IOException iex)
+ {
+ // If we land here, there's something more than a timeout
+ inInstancelogger.error(UNKNOWN_JMX_CONNECTION_ERROR, iex);
+ throw new RuntimeException(UNKNOWN_JMX_CONNECTION_ERROR, iex);
}
- throw new RuntimeException("Could not start JMX - unreachable after 20
attempts", lastThrown);
}
- public void stopJmx() throws IllegalAccessException, NoSuchFieldException,
InterruptedException
+ public void stopJmx()
{
if (!config.has(JMX))
return;
@@ -198,6 +186,14 @@ public class IsolatedJmx
inInstancelogger.warn("failed to close registry.", e);
}
try
+ {
+ clientSocketFactory.close();
+ }
+ catch (Throwable e)
+ {
+ inInstancelogger.warn("failed to close clientSocketFactory.", e);
+ }
+ try
{
serverSocketFactory.close();
}
@@ -207,25 +203,18 @@ public class IsolatedJmx
}
// The TCPEndpoint class holds references to a class in the in-jvm
dtest framework
// which transitively has a reference to the InstanceClassLoader, so
we need to
- // make sure to remove the reference to them when the instance is
shutting down
- clearMapField(TCPEndpoint.class, null, "localEndpoints");
- Thread.sleep(2 * RMI_KEEPALIVE_TIME); // Double the keep-alive time to
give Distributed GC some time to clean up
+ // make sure to remove the reference to them when the instance is
shutting down.
+ // Additionally, we must make sure to only clear endpoints created by
this instance
+ // As clearning the entire map can cause issues with starting and
stopping nodes mid-test.
+ clearMapField(TCPEndpoint.class, null, "localEndpoints",
this::endpointCreateByThisInstance);
+ Uninterruptibles.sleepUninterruptibly(2 * RMI_KEEPALIVE_TIME,
TimeUnit.MILLISECONDS); // Double the keep-alive time to give Distributed GC
some time to clean up
}
- private <K, V> void clearMapField(Class<?> clazz, Object instance, String
mapName)
- throws IllegalAccessException, NoSuchFieldException {
- Field mapField = clazz.getDeclaredField(mapName);
- mapField.setAccessible(true);
- Map<K, V> map = (Map<K, V>) mapField.get(instance);
- // Because multiple instances can be shutting down at once,
- // synchronize on the map to avoid ConcurrentModificationException
- synchronized (map)
- {
- for (Iterator<Map.Entry<K, V>> it = map.entrySet().iterator();
it.hasNext(); )
- {
- it.next();
- it.remove();
- }
- }
+ private boolean endpointCreateByThisInstance(Map.Entry<Object,
LinkedList<TCPEndpoint>> entry)
+ {
+ return entry.getValue()
+ .stream()
+ .anyMatch(ep -> ep.getServerSocketFactory() ==
this.serverSocketFactory &&
+ ep.getClientSocketFactory() ==
this.clientSocketFactory);
}
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
new file mode 100644
index 0000000000..3e9798ea02
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.shared;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.google.common.util.concurrent.Futures;
+
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+
+import static
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+
+public class ClusterUtils
+{
+
+ /**
+ * Start the instance with the given System Properties, after the instance
has started, the properties will be cleared.
+ */
+ public static <I extends IInstance> I start(I inst,
Consumer<WithProperties> fn)
+ {
+ return start(inst, (ignore, prop) -> fn.accept(prop));
+ }
+
+ /**
+ * Start the instance with the given System Properties, after the instance
has started, the properties will be cleared.
+ */
+ public static <I extends IInstance> I start(I inst, BiConsumer<I,
WithProperties> fn)
+ {
+ try (WithProperties properties = new WithProperties())
+ {
+ fn.accept(inst, properties);
+ inst.startup();
+ return inst;
+ }
+ }
+
+ /**
+ * Stop an instance in a blocking manner.
+ *
+ * The main difference between this and {@link IInstance#shutdown()} is
that the wait on the future will catch
+ * the exceptions and throw as runtime.
+ */
+ public static void stopUnchecked(IInstance i)
+ {
+ Futures.getUnchecked(i.shutdown());
+ }
+
+ public static List<RingInstanceDetails> awaitRingStatus(IInstance
instance, IInstance expectedInRing, String status)
+ {
+ return awaitInstanceMatching(instance, expectedInRing, d ->
d.status.equals(status),
+ "Timeout waiting for " + expectedInRing +
" to have status " + status);
+ }
+
+ /**
+ * Wait for the ring to have the target instance with the provided state.
+ *
+ * @param instance instance to check on
+ * @param expectedInRing to look for
+ * @param state expected
+ * @return the ring
+ */
+ public static List<RingInstanceDetails> awaitRingState(IInstance instance,
IInstance expectedInRing, String state)
+ {
+ return awaitInstanceMatching(instance, expectedInRing, d ->
d.state.equals(state),
+ "Timeout waiting for " + expectedInRing +
" to have state " + state);
+ }
+
+ private static List<RingInstanceDetails> awaitInstanceMatching(IInstance
instance,
+ IInstance
expectedInRing,
+
Predicate<RingInstanceDetails> predicate,
+ String
errorMessage)
+ {
+ return awaitRing(instance,
+ errorMessage,
+ ring -> ring.stream()
+ .filter(d ->
d.address.equals(getBroadcastAddressHostString(expectedInRing)))
+ .anyMatch(predicate));
+ }
+
+ private static List<RingInstanceDetails> awaitRing(IInstance src, String
errorMessage, Predicate<List<RingInstanceDetails>> fn)
+ {
+ List<RingInstanceDetails> ring = null;
+ for (int i = 0; i < 100; i++)
+ {
+ ring = ring(src);
+ if (fn.test(ring))
+ {
+ return ring;
+ }
+ sleepUninterruptibly(1, TimeUnit.SECONDS);
+ }
+ throw new AssertionError(errorMessage + "\n" + ring);
+ }
+
+ /**
+ * Get the broadcast address host address only (ex. 127.0.0.1)
+ */
+ private static String getBroadcastAddressHostString(IInstance target)
+ {
+ return
target.config().broadcastAddress().getAddress().getHostAddress();
+ }
+
+ /**
+ * Get the ring from the perspective of the instance.
+ */
+ public static List<RingInstanceDetails> ring(IInstance inst)
+ {
+ NodeToolResult results = inst.nodetoolResult("ring");
+ results.asserts().success();
+ return parseRing(results.getStdout());
+ }
+
+ private static List<RingInstanceDetails> parseRing(String str)
+ {
+ // 127.0.0.3 rack0 Up Normal 46.21 KB 100.00%
-1
+ // /127.0.0.1:7012 Unknown ? Normal ? 100.00%
-3074457345618258603
+ Pattern pattern =
Pattern.compile("^(/?[0-9.:]+)\\s+(\\w+|\\?)\\s+(\\w+|\\?)\\s+(\\w+|\\?).*?(-?\\d+)\\s*$");
+ List<RingInstanceDetails> details = new ArrayList<>();
+ String[] lines = str.split("\n");
+ for (String line : lines)
+ {
+ Matcher matcher = pattern.matcher(line);
+ if (!matcher.find())
+ {
+ continue;
+ }
+ details.add(new RingInstanceDetails(matcher.group(1),
matcher.group(2), matcher.group(3), matcher.group(4), matcher.group(5)));
+ }
+
+ return details;
+ }
+
+ public static final class RingInstanceDetails
+ {
+ private final String address;
+ private final String rack;
+ private final String status;
+ private final String state;
+ private final String token;
+
+ private RingInstanceDetails(String address, String rack, String
status, String state, String token)
+ {
+ this.address = address;
+ this.rack = rack;
+ this.status = status;
+ this.state = state;
+ this.token = token;
+ }
+
+ public String getAddress()
+ {
+ return address;
+ }
+
+ public String getRack()
+ {
+ return rack;
+ }
+
+ public String getStatus()
+ {
+ return status;
+ }
+
+ public String getState()
+ {
+ return state;
+ }
+
+ public String getToken()
+ {
+ return token;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ RingInstanceDetails that = (RingInstanceDetails) o;
+ return Objects.equals(address, that.address) &&
+ Objects.equals(rack, that.rack) &&
+ Objects.equals(status, that.status) &&
+ Objects.equals(state, that.state) &&
+ Objects.equals(token, that.token);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(address, rack, status, state, token);
+ }
+
+ public String toString()
+ {
+ return Arrays.asList(address, rack, status, state,
token).toString();
+ }
+ }
+
+}
diff --git
a/test/distributed/org/apache/cassandra/distributed/shared/WithProperties.java
b/test/distributed/org/apache/cassandra/distributed/shared/WithProperties.java
new file mode 100644
index 0000000000..a2aeae27ac
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/shared/WithProperties.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.shared;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Supplier;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Joiner;
+
+public final class WithProperties implements AutoCloseable
+{
+ private final List<Runnable> rollback = new ArrayList<>();
+
+ public WithProperties()
+ {
+ }
+
+ public WithProperties with(String... kvs)
+ {
+ assert kvs.length % 2 == 0 : "Input must have an even amount of inputs
but given " + kvs.length;
+ for (int i = 0; i <= kvs.length - 2; i = i + 2)
+ with(kvs[i], kvs[i + 1]);
+ return this;
+ }
+
+ public WithProperties set(String prop, String value)
+ {
+ return set(prop, () -> System.setProperty(prop, value));
+ }
+
+ public WithProperties set(String prop, String... values)
+ {
+ return set(prop, Arrays.asList(values));
+ }
+
+ public WithProperties set(String prop, Collection<String> values)
+ {
+ return set(prop, Joiner.on(",").join(values));
+ }
+
+ public WithProperties set(String prop, boolean value)
+ {
+ return set(prop, () -> System.setProperty(prop,
convertToString(value)));
+ }
+
+ public WithProperties set(String prop, long value)
+ {
+ return set(prop, () -> System.setProperty(prop,
convertToString(value)));
+ }
+
+ private void with(String key, String value)
+ {
+ String previous = System.setProperty(key, value); // checkstyle:
suppress nearby 'blockSystemPropertyUsage'
+ rollback.add(previous == null ? () -> System.clearProperty(key) : ()
-> System.setProperty(key, previous)); // checkstyle: suppress nearby
'blockSystemPropertyUsage'
+ }
+
+ private WithProperties set(String prop, Supplier<Object> prev)
+ {
+ String previous = convertToString(prev.get());
+ rollback.add(previous == null ? () -> System.clearProperty(prop) : ()
-> System.setProperty(prop, previous));
+ return this;
+ }
+
+ @Override
+ public void close()
+ {
+ Collections.reverse(rollback);
+ rollback.forEach(Runnable::run);
+ rollback.clear();
+ }
+
+ public static String convertToString(@Nullable Object value)
+ {
+ if (value == null)
+ return null;
+ if (value instanceof String)
+ return (String) value;
+ if (value instanceof Boolean)
+ return Boolean.toString((Boolean) value);
+ if (value instanceof Long)
+ return Long.toString((Long) value);
+ if (value instanceof Integer)
+ return Integer.toString((Integer) value);
+ if (value instanceof Double)
+ return Double.toString((Double) value);
+ if (value instanceof Float)
+ return Float.toString((Float) value);
+ throw new IllegalArgumentException("Unknown type " + value.getClass());
+ }
+}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java
b/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java
index 177cbb9566..94b8bf8f73 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java
@@ -51,6 +51,7 @@ import static
org.apache.cassandra.distributed.api.Feature.GOSSIP;
import static org.apache.cassandra.distributed.api.Feature.JMX;
import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static
org.apache.cassandra.distributed.test.jmx.JMXGetterCheckTest.testAllValidGetters;
import static org.hamcrest.Matchers.startsWith;
/* Resource Leak Test - useful when tracking down issues with in-JVM framework
cleanup.
@@ -73,7 +74,7 @@ public class ResourceLeakTest extends TestBaseImpl
final boolean dumpEveryLoop = false; // Dump heap & possibly files every
loop
final boolean dumpFileHandles = false; // Call lsof whenever dumping
resources
final boolean forceCollection = false; // Whether to explicitly force
finalization/gc for smaller heap dumps
- final long finalWaitMillis = 0l; // Number of millis to wait before
final resource dump to give gc a chance
+ final long finalWaitMillis = 0L; // Number of millis to wait before
final resource dump to give gc a chance
static final SimpleDateFormat format = new
SimpleDateFormat("yyyyMMddHHmmss");
static final String when = format.format(Date.from(Instant.now()));
@@ -148,6 +149,30 @@ public class ResourceLeakTest extends TestBaseImpl
}
}
+ static void testJmx(Cluster cluster)
+ {
+ try
+ {
+ for (IInvokableInstance instance : cluster.get(1, cluster.size()))
+ {
+ IInstanceConfig config = instance.config();
+ try (JMXConnector jmxc = JMXUtil.getJmxConnector(config, 5))
+ {
+ MBeanServerConnection mbsc =
jmxc.getMBeanServerConnection();
+ // instances get their default domain set to their IP
address, so us it
+ // to check that we are actually connecting to the correct
instance
+ String defaultDomain = mbsc.getDefaultDomain();
+ Assert.assertThat(defaultDomain,
startsWith(JMXUtil.getJmxHost(config) + ":" + config.jmxPort()));
+ }
+ }
+ testAllValidGetters(cluster);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
void doTest(int numClusterNodes, Consumer<IInstanceConfig> updater) throws
Throwable
{
doTest(numClusterNodes, updater, ignored -> {});
@@ -231,27 +256,7 @@ public class ResourceLeakTest extends TestBaseImpl
@Test
public void looperJmxTest() throws Throwable
{
- doTest(1, config -> config.with(JMX), cluster -> {
- // NOTE: At some point, the hostname of the broadcastAddress can
be resolved
- // and then the `getHostString`, which would otherwise return the
IP address,
- // starts returning `localhost` - use
`.getAddress().getHostAddress()` to work around this.
- for (IInvokableInstance instance:cluster.get(1, cluster.size()))
- {
- IInstanceConfig config = instance.config();
- try (JMXConnector jmxc = JMXUtil.getJmxConnector(config))
- {
- MBeanServerConnection mbsc =
jmxc.getMBeanServerConnection();
- // instances get their default domain set to their IP
address, so us it
- // to check that we are actually connecting to the correct
instance
- String defaultDomain = mbsc.getDefaultDomain();
- Assert.assertThat(defaultDomain,
startsWith(JMXUtil.getJmxHost(config) + ":" + config.jmxPort()));
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- }
- });
+ doTest(2, config -> config.with(JMX), ResourceLeakTest::testJmx);
if (forceCollection)
{
System.runFinalization();
@@ -264,7 +269,10 @@ public class ResourceLeakTest extends TestBaseImpl
@Test
public void looperEverythingTest() throws Throwable
{
- doTest(1, config -> config.with(Feature.values()));
+ doTest(2, config -> config.with(Feature.values()),
+ cluster -> {
+ testJmx(cluster);
+ });
if (forceCollection)
{
System.runFinalization();
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXFeatureTest.java
b/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXFeatureTest.java
index 1c38bd11e9..e227a24f8e 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXFeatureTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXFeatureTest.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.distributed.test.jmx;
-import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import javax.management.MBeanServerConnection;
@@ -31,13 +30,20 @@ import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
import org.apache.cassandra.distributed.shared.JMXUtil;
import org.apache.cassandra.distributed.test.TestBaseImpl;
+import static
org.apache.cassandra.distributed.test.jmx.JMXGetterCheckTest.testAllValidGetters;
+import static org.hamcrest.Matchers.blankOrNullString;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.startsWith;
public class JMXFeatureTest extends TestBaseImpl
{
+
/**
* Test the in-jvm dtest JMX feature.
* - Create a cluster with multiple JMX servers, one per instance
@@ -47,7 +53,7 @@ public class JMXFeatureTest extends TestBaseImpl
* ports in addition to IP/Host for binding, but this version does not
support that feature. Keeping the test name the same
* so that it's consistent across versions.
*
- * @throws Exception
+ * @throws Exception it's a test that calls JMX endpoints - lots of
different Jmx exceptions are possible
*/
@Test
public void testMultipleNetworkInterfacesProvisioning() throws Exception
@@ -59,18 +65,57 @@ public class JMXFeatureTest extends TestBaseImpl
try (Cluster cluster = Cluster.build(2).withConfig(c ->
c.with(Feature.values())).start())
{
Set<String> instancesContacted = new HashSet<>();
- for (IInvokableInstance instance : cluster.get(1, 2))
+ for (IInvokableInstance instance : cluster)
{
testInstance(instancesContacted, instance);
}
Assert.assertEquals("Should have connected with both JMX
instances.", 2, instancesContacted.size());
allInstances.addAll(instancesContacted);
+ // Make sure we actually exercise the mbeans by testing a
bunch of getters.
+ // Without this it's possible for the test to pass as we don't
touch any mBeans that we register.
+ testAllValidGetters(cluster);
}
}
Assert.assertEquals("Each instance from each cluster should have been
unique", iterations * 2, allInstances.size());
}
- private void testInstance(Set<String> instancesContacted,
IInvokableInstance instance) throws IOException
+ @Test
+ public void testShutDownAndRestartInstances() throws Exception
+ {
+ HashSet<String> instances = new HashSet<>();
+ try (Cluster cluster = Cluster.build(2).withConfig(c ->
c.with(Feature.values())).start())
+ {
+ IInvokableInstance instanceToStop = cluster.get(1);
+ IInvokableInstance otherInstance = cluster.get(2);
+ testInstance(instances, cluster.get(1));
+ testAllValidGetters(cluster);
+ ClusterUtils.stopUnchecked(instanceToStop);
+ // NOTE: This would previously fail because we cleared everything
from the TCPEndpoint map in IsolatedJmx.
+ // Now, we only clear the endpoints related to that instance,
which prevents this code from
+ // breaking with a `java.net.BindException: Address already in use
(Bind failed)`
+ ClusterUtils.awaitRingStatus(otherInstance, instanceToStop,
"Down");
+ NodeToolResult statusResult =
cluster.get(2).nodetoolResult("status");
+ Assert.assertEquals(0, statusResult.getRc());
+ Assert.assertThat(statusResult.getStderr(),
is(blankOrNullString()));
+ Assert.assertThat(statusResult.getStdout(), containsString("DN
127.0.0.1"));
+ testInstance(instances, cluster.get(2));
+ ClusterUtils.start(instanceToStop, props -> {});
+ ClusterUtils.awaitRingState(otherInstance, instanceToStop,
"Normal");
+ ClusterUtils.awaitRingStatus(otherInstance, instanceToStop, "Up");
+ statusResult = cluster.get(1).nodetoolResult("status");
+ Assert.assertEquals(0, statusResult.getRc());
+ Assert.assertThat(statusResult.getStderr(),
is(blankOrNullString()));
+ Assert.assertThat(statusResult.getStdout(), containsString("UN
127.0.0.1"));
+ statusResult = cluster.get(2).nodetoolResult("status");
+ Assert.assertEquals(0, statusResult.getRc());
+ Assert.assertThat(statusResult.getStderr(),
is(blankOrNullString()));
+ Assert.assertThat(statusResult.getStdout(), containsString("UN
127.0.0.1"));
+ testInstance(instances, cluster.get(1));
+ testAllValidGetters(cluster);
+ }
+ }
+
+ private void testInstance(Set<String> instancesContacted,
IInvokableInstance instance)
{
IInstanceConfig config = instance.config();
try (JMXConnector jmxc = JMXUtil.getJmxConnector(config))
@@ -82,5 +127,9 @@ public class JMXFeatureTest extends TestBaseImpl
instancesContacted.add(defaultDomain);
Assert.assertThat(defaultDomain,
startsWith(JMXUtil.getJmxHost(config) + ":" + config.jmxPort()));
}
+ catch (Throwable t)
+ {
+ throw new RuntimeException("Could not connect to JMX", t);
+ }
}
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXGetterCheckTest.java
b/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXGetterCheckTest.java
index a3f74df60d..bbcfa52ebc 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXGetterCheckTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXGetterCheckTest.java
@@ -29,44 +29,68 @@ import javax.management.MBeanOperationInfo;
import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
-import javax.management.remote.JMXConnectorFactory;
-import javax.management.remote.JMXServiceURL;
import com.google.common.collect.ImmutableSet;
import org.junit.Test;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.shared.JMXUtil;
import org.apache.cassandra.distributed.test.TestBaseImpl;
public class JMXGetterCheckTest extends TestBaseImpl
{
private static final Set<String> IGNORE_ATTRIBUTES = ImmutableSet.of(
- "org.apache.cassandra.net:type=MessagingService:BackPressurePerHost" //
throws unsupported saying the feature was removed... dropped in CASSANDRA-15375
+ "org.apache.cassandra.net:type=MessagingService:BackPressurePerHost", //
throws unsupported saying the feature was removed... dropped in CASSANDRA-15375
+ "org.apache.cassandra.db:type=StorageService:MaxNativeProtocolVersion" //
Throws NPE
);
private static final Set<String> IGNORE_OPERATIONS = ImmutableSet.of(
"org.apache.cassandra.db:type=StorageService:stopDaemon", // halts the
instance, which then causes the JVM to exit
"org.apache.cassandra.db:type=StorageService:drain", // don't drain, it
stops things which can cause other APIs to be unstable as we are in a stopped
state
"org.apache.cassandra.db:type=StorageService:stopGossiping", // if we stop
gossip this can cause other issues, so avoid
"org.apache.cassandra.db:type=StorageService:resetLocalSchema", // this
will fail when there are no other nodes which can serve schema
-
"org.apache.cassandra.db:type=HintedHandoffManager:listEndpointsPendingHints",
// this will fail because it only exists to match an old, deprecated mbean and
just throws an UnsportedOperationException
- "org.apache.cassandra.db:type=StorageService:decommission" // Don't
decommission nodes! Note that in future versions of C* this is unnecessary
because decommission takes an argument.
+ "org.apache.cassandra.db:type=StorageService:joinRing", // Causes
bootstrapping errors
+ "org.apache.cassandra.db:type=StorageService:clearConnectionHistory", //
Throws a NullPointerException
+ "org.apache.cassandra.db:type=StorageService:startGossiping", // causes
multiple loops to fail
+ "org.apache.cassandra.db:type=StorageService:startNativeTransport", //
causes multiple loops to fail
+
"org.apache.cassandra.db:type=Tables,keyspace=system,table=local:loadNewSSTables",
// Shouldn't attempt to load SSTables as sometimes the temp directories don't
work
+
"org.apache.cassandra.db:type=HintedHandoffManager:listEndpointsPendingHints",
// hard-coded to throw UnsupportedOperationException
+ "org.apache.cassandra.db:type=StorageService:startRPCServer", // In looped
tests this can sometimes fail
+ "org.apache.cassandra.db:type=StorageService:decommission" // shutting
down the node and decommissioning it is bad - in later versions, it takes a
parameter and wouldn't be run in this test
);
- public static final String JMX_SERVICE_URL_FMT =
"service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi";
-
@Test
public void testGetters() throws Exception
{
- try (Cluster cluster = Cluster.build(1).withConfig(c ->
c.with(Feature.values())).start())
+ for (int i=0; i < 2; i++)
{
- IInvokableInstance instance = cluster.get(1);
+ try (Cluster cluster = Cluster.build(1).withConfig(c ->
c.with(Feature.values())).start())
+ {
+ testAllValidGetters(cluster);
+ }
+ }
+ }
- String jmxHost =
instance.config().broadcastAddress().getAddress().getHostAddress();
- String url = String.format(JMX_SERVICE_URL_FMT, jmxHost,
instance.config().jmxPort());
+ /**
+ * Tests JMX getters and operations.
+ * Useful for more than just testing getters, it also is used in
JMXFeatureTest
+ * to make sure we've touched the complete JMX code path.
+ * @param cluster the cluster to test
+ * @throws Exception several kinds of exceptions can be thrown, mostly
from JMX infrastructure issues.
+ */
+ public static void testAllValidGetters(Cluster cluster) throws Exception
+ {
+ for (IInvokableInstance instance: cluster)
+ {
+ if (instance.isShutdown())
+ {
+ continue;
+ }
+ IInstanceConfig config = instance.config();
List<Named> errors = new ArrayList<>();
- try (JMXConnector jmxc = JMXConnectorFactory.connect(new
JMXServiceURL(url), null))
+ try (JMXConnector jmxc = JMXUtil.getJmxConnector(config))
{
MBeanServerConnection mbsc = jmxc.getMBeanServerConnection();
Set<ObjectName> metricNames = new
TreeSet<>(mbsc.queryNames(null, null));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]