This is an automated email from the ASF dual-hosted git repository.

jonmeredith pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-3.11 by this push:
     new c6d7d070c5 IsolatedJMX should not release all TCPEndpoints on instance 
shutdown
c6d7d070c5 is described below

commit c6d7d070c59d81db8949683d3e5670b909efb48c
Author: Doug Rohrer <[email protected]>
AuthorDate: Wed Aug 23 18:03:30 2023 -0600

    IsolatedJMX should not release all TCPEndpoints on instance shutdown
    
    patch by Doug Rohrer; reviewed by Jon Meredith, Stefan Miklosovic for 
CASSANDRA-18725
---
 build.xml                                          |   2 +-
 .../utils/RMIClientSocketFactoryImpl.java          |  23 ++-
 .../apache/cassandra/utils/ReflectionUtils.java    | 101 +++++++++
 .../cassandra/distributed/impl/Instance.java       |   5 +-
 .../cassandra/distributed/impl/IsolatedJmx.java    |  95 ++++-----
 .../cassandra/distributed/shared/ClusterUtils.java | 226 +++++++++++++++++++++
 .../distributed/shared/WithProperties.java         | 112 ++++++++++
 .../distributed/test/ResourceLeakTest.java         |  54 ++---
 .../distributed/test/jmx/JMXFeatureTest.java       |  57 +++++-
 .../distributed/test/jmx/JMXGetterCheckTest.java   |  48 +++--
 10 files changed, 627 insertions(+), 96 deletions(-)

diff --git a/build.xml b/build.xml
index c634761566..70a195f98d 100644
--- a/build.xml
+++ b/build.xml
@@ -383,7 +383,7 @@
             <exclusion groupId="org.hamcrest" artifactId="hamcrest-core"/>
           </dependency>
           <dependency groupId="org.mockito" artifactId="mockito-core" 
version="3.2.4" scope="test"/>
-          <dependency groupId="org.apache.cassandra" artifactId="dtest-api" 
version="0.0.15" scope="test"/>
+          <dependency groupId="org.apache.cassandra" artifactId="dtest-api" 
version="0.0.16" scope="test"/>
           <dependency groupId="org.reflections" artifactId="reflections" 
version="0.10.2" scope="test"/>
           <dependency groupId="org.quicktheories" artifactId="quicktheories" 
version="0.25" scope="test"/>
           <dependency groupId="org.apache.hadoop" artifactId="hadoop-core" 
version="1.0.3" scope="provided">
diff --git 
a/src/java/org/apache/cassandra/utils/RMIClientSocketFactoryImpl.java 
b/src/java/org/apache/cassandra/utils/RMIClientSocketFactoryImpl.java
index 62ab88fb92..c04e934fdc 100644
--- a/src/java/org/apache/cassandra/utils/RMIClientSocketFactoryImpl.java
+++ b/src/java/org/apache/cassandra/utils/RMIClientSocketFactoryImpl.java
@@ -21,8 +21,11 @@ package org.apache.cassandra.utils;
 import java.io.IOException;
 import java.io.Serializable;
 import java.net.InetAddress;
+import java.net.ServerSocket;
 import java.net.Socket;
 import java.rmi.server.RMIClientSocketFactory;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Objects;
 
 /**
@@ -32,6 +35,7 @@ import java.util.Objects;
  */
 public class RMIClientSocketFactoryImpl implements RMIClientSocketFactory, 
Serializable
 {
+    List<Socket> sockets = new ArrayList<>();
     private final InetAddress localAddress;
 
     public RMIClientSocketFactoryImpl(InetAddress localAddress)
@@ -42,7 +46,24 @@ public class RMIClientSocketFactoryImpl implements 
RMIClientSocketFactory, Seria
     @Override
     public Socket createSocket(String host, int port) throws IOException
     {
-        return new Socket(localAddress, port);
+        Socket socket = new Socket(localAddress, port);
+        sockets.add(socket);
+        return socket;
+    }
+
+    public void close() throws IOException
+    {
+        for (Socket socket: sockets)
+        {
+            try
+            {
+                socket.close();
+            }
+            catch (IOException ignored)
+            {
+                // intentionally ignored
+            }
+        }
     }
 
     @Override
diff --git a/src/java/org/apache/cassandra/utils/ReflectionUtils.java 
b/src/java/org/apache/cassandra/utils/ReflectionUtils.java
new file mode 100644
index 0000000000..3d1cc29565
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/ReflectionUtils.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.function.Predicate;
+
+public class ReflectionUtils
+{
+    public static Field getField(Class<?> clazz, String fieldName) throws 
NoSuchFieldException
+    {
+        // below code works before Java 12
+        try
+        {
+            return clazz.getDeclaredField(fieldName);
+        }
+        catch (NoSuchFieldException e)
+        {
+            // this is mitigation for JDK 17 
(https://bugs.openjdk.org/browse/JDK-8210522)
+            try
+            {
+                Method getDeclaredFields0 = 
Class.class.getDeclaredMethod("getDeclaredFields0", boolean.class);
+                getDeclaredFields0.setAccessible(true);
+                Field[] fields = (Field[]) getDeclaredFields0.invoke(clazz, 
false);
+                for (Field field : fields)
+                {
+                    if (fieldName.equals(field.getName()))
+                    {
+                        return field;
+                    }
+                }
+            }
+            catch (ReflectiveOperationException ex)
+            {
+                e.addSuppressed(ex);
+            }
+            throw e;
+        }
+    }
+
+    /**
+     * Used by the in-jvm dtest framework to remove entries from private map 
fields that otherwise would prevent
+     * collection of classloaders (which causes metaspace OOMs) or otherwise 
interfere with instance restart.
+     *
+     * @param clazz        The class which has the map field to clear
+     * @param instance     an instance of the class to clear (pass null for a 
static member)
+     * @param mapName      the name of the map field to clear
+     * @param shouldRemove a predicate which determines if the entry in 
question should be removed
+     * @param <K>          The type of the map key
+     * @param <V>          The type of the map value
+     */
+    public static <K, V> void clearMapField(Class<?> clazz, Object instance, 
String mapName, Predicate<Map.Entry<K, V>> shouldRemove)
+    {
+        try
+        {
+            Field mapField = getField(clazz, mapName);
+            mapField.setAccessible(true);
+            // noinspection unchecked
+            Map<K, V> map = (Map<K, V>) mapField.get(instance);
+            // Because multiple instances can be shutting down at once,
+            // synchronize on the map to avoid ConcurrentModificationException
+            synchronized (map)
+            {
+                // This could be done with a simple `map.entrySet.removeIf()` 
call
+                // but for debugging purposes it is much easier to keep it 
like this.
+                Iterator<Map.Entry<K, V>> it = map.entrySet().iterator();
+                while (it.hasNext())
+                {
+                    Map.Entry<K, V> entry = it.next();
+                    if (shouldRemove.test(entry))
+                    {
+                        it.remove();
+                    }
+                }
+            }
+        }
+        catch (NoSuchFieldException | IllegalAccessException ex)
+        {
+            throw new RuntimeException(String.format("Could not clear map 
field %s in class %s", mapName, clazz), ex);
+        }
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java 
b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index cca6296629..bbf98ca022 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -614,17 +614,18 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
         initialized = true;
     }
 
-    private void startJmx()
+    private synchronized void startJmx()
     {
         isolatedJmx = new IsolatedJmx(this, inInstancelogger);
         isolatedJmx.startJmx();
     }
 
-    private void stopJmx() throws IllegalAccessException, 
NoSuchFieldException, InterruptedException
+    private synchronized void stopJmx()
     {
         if (config.has(JMX))
         {
             isolatedJmx.stopJmx();
+            isolatedJmx = null;
         }
     }
 
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/IsolatedJmx.java 
b/test/distributed/org/apache/cassandra/distributed/impl/IsolatedJmx.java
index b3d06590a9..e586cae58d 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/IsolatedJmx.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/IsolatedJmx.java
@@ -18,14 +18,13 @@
 
 package org.apache.cassandra.distributed.impl;
 
-import java.lang.reflect.Field;
+import java.io.IOException;
 import java.net.InetAddress;
-import java.net.MalformedURLException;
 import java.util.HashMap;
-import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import javax.management.remote.JMXConnector;
-import javax.management.remote.JMXConnectorFactory;
 import javax.management.remote.JMXConnectorServer;
 import javax.management.remote.JMXServiceURL;
 import javax.management.remote.rmi.RMIConnectorServer;
@@ -34,6 +33,8 @@ import javax.management.remote.rmi.RMIJRMPServerImpl;
 import org.slf4j.Logger;
 
 import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.shared.JMXUtil;
+import org.apache.cassandra.distributed.shared.Uninterruptibles;
 import org.apache.cassandra.utils.JMXServerUtils;
 import org.apache.cassandra.utils.MBeanWrapper;
 import org.apache.cassandra.utils.RMIClientSocketFactoryImpl;
@@ -41,6 +42,7 @@ import sun.rmi.transport.tcp.TCPEndpoint;
 
 import static org.apache.cassandra.distributed.api.Feature.JMX;
 import static 
org.apache.cassandra.utils.MBeanWrapper.IS_DISABLED_MBEAN_REGISTRATION;
+import static org.apache.cassandra.utils.ReflectionUtils.clearMapField;
 
 public class IsolatedJmx
 {
@@ -48,7 +50,8 @@ public class IsolatedJmx
     private static final String SUN_RMI_TRANSPORT_TCP_THREADKEEPALIVETIME = 
"sun.rmi.transport.tcp.threadKeepAliveTime";
     /** Controls the distributed garbage collector lease time for JMX objects. 
*/
     private static final String JAVA_RMI_DGC_LEASE_VALUE_IN_JVM_DTEST 
="java.rmi.dgc.leaseValue";
-    private static final int RMI_KEEPALIVE_TIME = 1000;
+    public static final int RMI_KEEPALIVE_TIME = 1000;
+    public static final String UNKNOWN_JMX_CONNECTION_ERROR = "Could not 
connect to JMX due to an unknown error";
 
     private JMXConnectorServer jmxConnectorServer;
     private JMXServerUtils.JmxRegistry registry;
@@ -59,7 +62,8 @@ public class IsolatedJmx
     private Logger inInstancelogger;
     private IInstanceConfig config;
 
-    public IsolatedJmx(Instance instance, Logger inInstancelogger) {
+    public IsolatedJmx(Instance instance, Logger inInstancelogger)
+    {
         this.inInstancelogger = inInstancelogger;
         config = instance.config();
     }
@@ -127,45 +131,29 @@ public class IsolatedJmx
 
             registry.setRemoteServerStub(jmxRmiServer.toStub());
             JMXServerUtils.logJmxServiceUrl(addr, jmxPort);
-            waitForJmxAvailability(hostname, jmxPort, env);
+            waitForJmxAvailability(env);
         }
-        catch (Throwable e)
+        catch (Throwable t)
         {
-            throw new RuntimeException("Feature.JMX was enabled but could not 
be started.", e);
+            throw new RuntimeException("Feature.JMX was enabled but could not 
be started.", t);
         }
     }
 
-
-    private void waitForJmxAvailability(String hostname, int rmiPort, 
Map<String, Object> env) throws InterruptedException, MalformedURLException
+    private void waitForJmxAvailability(Map<String, ?> env)
     {
-        String url = 
String.format("service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi", hostname, rmiPort);
-        JMXServiceURL serviceURL = new JMXServiceURL(url);
-        int attempts = 0;
-        Throwable lastThrown = null;
-        while (attempts < 20)
+        try (JMXConnector ignored = JMXUtil.getJmxConnector(config, 20, env))
         {
-            attempts++;
-            try (JMXConnector ignored = 
JMXConnectorFactory.connect(serviceURL, env))
-            {
-                inInstancelogger.info("Connected to JMX server at {} after {} 
attempt(s)",
-                                      url, attempts);
-                return;
-            }
-            catch (MalformedURLException e)
-            {
-                throw new RuntimeException(e);
-            }
-            catch (Throwable thrown)
-            {
-                lastThrown = thrown;
-            }
-            inInstancelogger.info("Could not connect to JMX on {} after {} 
attempts. Will retry.", url, attempts);
-            Thread.sleep(1000);
+            // Do nothing - JMXUtil now retries
+        }
+        catch (IOException iex)
+        {
+            // If we land here, there's something more than a timeout
+            inInstancelogger.error(UNKNOWN_JMX_CONNECTION_ERROR, iex);
+            throw new RuntimeException(UNKNOWN_JMX_CONNECTION_ERROR, iex);
         }
-        throw new RuntimeException("Could not start JMX - unreachable after 20 
attempts", lastThrown);
     }
 
-    public void stopJmx() throws IllegalAccessException, NoSuchFieldException, 
InterruptedException
+    public void stopJmx()
     {
         if (!config.has(JMX))
             return;
@@ -198,6 +186,14 @@ public class IsolatedJmx
             inInstancelogger.warn("failed to close registry.", e);
         }
         try
+        {
+            clientSocketFactory.close();
+        }
+        catch (Throwable e)
+        {
+            inInstancelogger.warn("failed to close clientSocketFactory.", e);
+        }
+        try
         {
             serverSocketFactory.close();
         }
@@ -207,25 +203,18 @@ public class IsolatedJmx
         }
         // The TCPEndpoint class holds references to a class in the in-jvm 
dtest framework
         // which transitively has a reference to the InstanceClassLoader, so 
we need to
-        // make sure to remove the reference to them when the instance is 
shutting down
-        clearMapField(TCPEndpoint.class, null, "localEndpoints");
-        Thread.sleep(2 * RMI_KEEPALIVE_TIME); // Double the keep-alive time to 
give Distributed GC some time to clean up
+        // make sure to remove the reference to them when the instance is 
shutting down.
+        // Additionally, we must make sure to only clear endpoints created by 
this instance
+        // As clearning the entire map can cause issues with starting and 
stopping nodes mid-test.
+        clearMapField(TCPEndpoint.class, null, "localEndpoints", 
this::endpointCreateByThisInstance);
+        Uninterruptibles.sleepUninterruptibly(2 * RMI_KEEPALIVE_TIME, 
TimeUnit.MILLISECONDS); // Double the keep-alive time to give Distributed GC 
some time to clean up
     }
 
-    private <K, V> void clearMapField(Class<?> clazz, Object instance, String 
mapName)
-    throws IllegalAccessException, NoSuchFieldException {
-        Field mapField = clazz.getDeclaredField(mapName);
-        mapField.setAccessible(true);
-        Map<K, V> map = (Map<K, V>) mapField.get(instance);
-        // Because multiple instances can be shutting down at once,
-        // synchronize on the map to avoid ConcurrentModificationException
-        synchronized (map)
-        {
-            for (Iterator<Map.Entry<K, V>> it = map.entrySet().iterator(); 
it.hasNext(); )
-            {
-                it.next();
-                it.remove();
-            }
-        }
+    private boolean endpointCreateByThisInstance(Map.Entry<Object, 
LinkedList<TCPEndpoint>> entry)
+    {
+        return entry.getValue()
+                    .stream()
+                    .anyMatch(ep -> ep.getServerSocketFactory() == 
this.serverSocketFactory &&
+                                    ep.getClientSocketFactory() == 
this.clientSocketFactory);
     }
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java 
b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
new file mode 100644
index 0000000000..3e9798ea02
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.shared;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.google.common.util.concurrent.Futures;
+
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+
+import static 
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+
+public class ClusterUtils
+{
+
+    /**
+     * Start the instance with the given System Properties, after the instance 
has started, the properties will be cleared.
+     */
+    public static <I extends IInstance> I start(I inst, 
Consumer<WithProperties> fn)
+    {
+        return start(inst, (ignore, prop) -> fn.accept(prop));
+    }
+
+    /**
+     * Start the instance with the given System Properties, after the instance 
has started, the properties will be cleared.
+     */
+    public static <I extends IInstance> I start(I inst, BiConsumer<I, 
WithProperties> fn)
+    {
+        try (WithProperties properties = new WithProperties())
+        {
+            fn.accept(inst, properties);
+            inst.startup();
+            return inst;
+        }
+    }
+
+    /**
+     * Stop an instance in a blocking manner.
+     *
+     * The main difference between this and {@link IInstance#shutdown()} is 
that the wait on the future will catch
+     * the exceptions and throw as runtime.
+     */
+    public static void stopUnchecked(IInstance i)
+    {
+        Futures.getUnchecked(i.shutdown());
+    }
+
+    public static List<RingInstanceDetails> awaitRingStatus(IInstance 
instance, IInstance expectedInRing, String status)
+    {
+        return awaitInstanceMatching(instance, expectedInRing, d -> 
d.status.equals(status),
+                                     "Timeout waiting for " + expectedInRing + 
" to have status " + status);
+    }
+
+    /**
+     * Wait for the ring to have the target instance with the provided state.
+     *
+     * @param instance instance to check on
+     * @param expectedInRing to look for
+     * @param state expected
+     * @return the ring
+     */
+    public static List<RingInstanceDetails> awaitRingState(IInstance instance, 
IInstance expectedInRing, String state)
+    {
+        return awaitInstanceMatching(instance, expectedInRing, d -> 
d.state.equals(state),
+                                     "Timeout waiting for " + expectedInRing + 
" to have state " + state);
+    }
+
+    private static List<RingInstanceDetails> awaitInstanceMatching(IInstance 
instance,
+                                                                   IInstance 
expectedInRing,
+                                                                   
Predicate<RingInstanceDetails> predicate,
+                                                                   String 
errorMessage)
+    {
+        return awaitRing(instance,
+                         errorMessage,
+                         ring -> ring.stream()
+                                     .filter(d -> 
d.address.equals(getBroadcastAddressHostString(expectedInRing)))
+                                     .anyMatch(predicate));
+    }
+
+    private static List<RingInstanceDetails> awaitRing(IInstance src, String 
errorMessage, Predicate<List<RingInstanceDetails>> fn)
+    {
+        List<RingInstanceDetails> ring = null;
+        for (int i = 0; i < 100; i++)
+        {
+            ring = ring(src);
+            if (fn.test(ring))
+            {
+                return ring;
+            }
+            sleepUninterruptibly(1, TimeUnit.SECONDS);
+        }
+        throw new AssertionError(errorMessage + "\n" + ring);
+    }
+
+    /**
+     * Get the broadcast address host address only (ex. 127.0.0.1)
+     */
+    private static String getBroadcastAddressHostString(IInstance target)
+    {
+        return 
target.config().broadcastAddress().getAddress().getHostAddress();
+    }
+
+    /**
+     * Get the ring from the perspective of the instance.
+     */
+    public static List<RingInstanceDetails> ring(IInstance inst)
+    {
+        NodeToolResult results = inst.nodetoolResult("ring");
+        results.asserts().success();
+        return parseRing(results.getStdout());
+    }
+
+    private static List<RingInstanceDetails> parseRing(String str)
+    {
+        // 127.0.0.3  rack0       Up     Normal  46.21 KB        100.00%       
      -1
+        // /127.0.0.1:7012  Unknown     ?      Normal  ?               100.00% 
            -3074457345618258603
+        Pattern pattern = 
Pattern.compile("^(/?[0-9.:]+)\\s+(\\w+|\\?)\\s+(\\w+|\\?)\\s+(\\w+|\\?).*?(-?\\d+)\\s*$");
+        List<RingInstanceDetails> details = new ArrayList<>();
+        String[] lines = str.split("\n");
+        for (String line : lines)
+        {
+            Matcher matcher = pattern.matcher(line);
+            if (!matcher.find())
+            {
+                continue;
+            }
+            details.add(new RingInstanceDetails(matcher.group(1), 
matcher.group(2), matcher.group(3), matcher.group(4), matcher.group(5)));
+        }
+
+        return details;
+    }
+
+    public static final class RingInstanceDetails
+    {
+        private final String address;
+        private final String rack;
+        private final String status;
+        private final String state;
+        private final String token;
+
+        private RingInstanceDetails(String address, String rack, String 
status, String state, String token)
+        {
+            this.address = address;
+            this.rack = rack;
+            this.status = status;
+            this.state = state;
+            this.token = token;
+        }
+
+        public String getAddress()
+        {
+            return address;
+        }
+
+        public String getRack()
+        {
+            return rack;
+        }
+
+        public String getStatus()
+        {
+            return status;
+        }
+
+        public String getState()
+        {
+            return state;
+        }
+
+        public String getToken()
+        {
+            return token;
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            RingInstanceDetails that = (RingInstanceDetails) o;
+            return Objects.equals(address, that.address) &&
+                   Objects.equals(rack, that.rack) &&
+                   Objects.equals(status, that.status) &&
+                   Objects.equals(state, that.state) &&
+                   Objects.equals(token, that.token);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return Objects.hash(address, rack, status, state, token);
+        }
+
+        public String toString()
+        {
+            return Arrays.asList(address, rack, status, state, 
token).toString();
+        }
+    }
+
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/shared/WithProperties.java 
b/test/distributed/org/apache/cassandra/distributed/shared/WithProperties.java
new file mode 100644
index 0000000000..a2aeae27ac
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/shared/WithProperties.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.shared;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Supplier;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Joiner;
+
+public final class WithProperties implements AutoCloseable
+{
+    private final List<Runnable> rollback = new ArrayList<>();
+
+    public WithProperties()
+    {
+    }
+
+    public WithProperties with(String... kvs)
+    {
+        assert kvs.length % 2 == 0 : "Input must have an even amount of inputs 
but given " + kvs.length;
+        for (int i = 0; i <= kvs.length - 2; i = i + 2)
+            with(kvs[i], kvs[i + 1]);
+        return this;
+    }
+
+    public WithProperties set(String prop, String value)
+    {
+        return set(prop, () -> System.setProperty(prop, value));
+    }
+
+    public WithProperties set(String prop, String... values)
+    {
+        return set(prop, Arrays.asList(values));
+    }
+
+    public WithProperties set(String prop, Collection<String> values)
+    {
+        return set(prop, Joiner.on(",").join(values));
+    }
+
+    public WithProperties set(String prop, boolean value)
+    {
+        return set(prop, () -> System.setProperty(prop, 
convertToString(value)));
+    }
+
+    public WithProperties set(String prop, long value)
+    {
+        return set(prop, () -> System.setProperty(prop, 
convertToString(value)));
+    }
+
+    private void with(String key, String value)
+    {
+        String previous = System.setProperty(key, value); // checkstyle: 
suppress nearby 'blockSystemPropertyUsage'
+        rollback.add(previous == null ? () -> System.clearProperty(key) : () 
-> System.setProperty(key, previous)); // checkstyle: suppress nearby 
'blockSystemPropertyUsage'
+    }
+
+    private WithProperties set(String prop, Supplier<Object> prev)
+    {
+        String previous = convertToString(prev.get());
+        rollback.add(previous == null ? () -> System.clearProperty(prop) : () 
-> System.setProperty(prop, previous));
+        return this;
+    }
+
+    @Override
+    public void close()
+    {
+        Collections.reverse(rollback);
+        rollback.forEach(Runnable::run);
+        rollback.clear();
+    }
+
+    public static String convertToString(@Nullable Object value)
+    {
+        if (value == null)
+            return null;
+        if (value instanceof String)
+            return (String) value;
+        if (value instanceof Boolean)
+            return Boolean.toString((Boolean) value);
+        if (value instanceof Long)
+            return Long.toString((Long) value);
+        if (value instanceof Integer)
+            return Integer.toString((Integer) value);
+        if (value instanceof Double)
+            return Double.toString((Double) value);
+        if (value instanceof Float)
+            return Float.toString((Float) value);
+        throw new IllegalArgumentException("Unknown type " + value.getClass());
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java 
b/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java
index 177cbb9566..94b8bf8f73 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java
@@ -51,6 +51,7 @@ import static 
org.apache.cassandra.distributed.api.Feature.GOSSIP;
 import static org.apache.cassandra.distributed.api.Feature.JMX;
 import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
 import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static 
org.apache.cassandra.distributed.test.jmx.JMXGetterCheckTest.testAllValidGetters;
 import static org.hamcrest.Matchers.startsWith;
 
 /* Resource Leak Test - useful when tracking down issues with in-JVM framework 
cleanup.
@@ -73,7 +74,7 @@ public class ResourceLeakTest extends TestBaseImpl
     final boolean dumpEveryLoop = false;   // Dump heap & possibly files every 
loop
     final boolean dumpFileHandles = false; // Call lsof whenever dumping 
resources
     final boolean forceCollection = false; // Whether to explicitly force 
finalization/gc for smaller heap dumps
-    final long finalWaitMillis = 0l;       // Number of millis to wait before 
final resource dump to give gc a chance
+    final long finalWaitMillis = 0L;       // Number of millis to wait before 
final resource dump to give gc a chance
 
     static final SimpleDateFormat format = new 
SimpleDateFormat("yyyyMMddHHmmss");
     static final String when = format.format(Date.from(Instant.now()));
@@ -148,6 +149,30 @@ public class ResourceLeakTest extends TestBaseImpl
         }
     }
 
+    static void testJmx(Cluster cluster)
+    {
+        try
+        {
+            for (IInvokableInstance instance : cluster.get(1, cluster.size()))
+            {
+                IInstanceConfig config = instance.config();
+                try (JMXConnector jmxc = JMXUtil.getJmxConnector(config, 5))
+                {
+                    MBeanServerConnection mbsc = 
jmxc.getMBeanServerConnection();
+                    // instances get their default domain set to their IP 
address, so us it
+                    // to check that we are actually connecting to the correct 
instance
+                    String defaultDomain = mbsc.getDefaultDomain();
+                    Assert.assertThat(defaultDomain, 
startsWith(JMXUtil.getJmxHost(config) + ":" + config.jmxPort()));
+                }
+            }
+            testAllValidGetters(cluster);
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
     void doTest(int numClusterNodes, Consumer<IInstanceConfig> updater) throws 
Throwable
     {
         doTest(numClusterNodes, updater, ignored -> {});
@@ -231,27 +256,7 @@ public class ResourceLeakTest extends TestBaseImpl
     @Test
     public void looperJmxTest() throws Throwable
     {
-        doTest(1, config -> config.with(JMX), cluster -> {
-            // NOTE: At some point, the hostname of the broadcastAddress can 
be resolved
-            // and then the `getHostString`, which would otherwise return the 
IP address,
-            // starts returning `localhost` - use 
`.getAddress().getHostAddress()` to work around this.
-            for (IInvokableInstance instance:cluster.get(1, cluster.size()))
-            {
-                IInstanceConfig config = instance.config();
-                try (JMXConnector jmxc = JMXUtil.getJmxConnector(config))
-                {
-                    MBeanServerConnection mbsc = 
jmxc.getMBeanServerConnection();
-                    // instances get their default domain set to their IP 
address, so us it
-                    // to check that we are actually connecting to the correct 
instance
-                    String defaultDomain = mbsc.getDefaultDomain();
-                    Assert.assertThat(defaultDomain, 
startsWith(JMXUtil.getJmxHost(config) + ":" + config.jmxPort()));
-                }
-                catch (IOException e)
-                {
-                    throw new RuntimeException(e);
-                }
-            }
-        });
+        doTest(2, config -> config.with(JMX), ResourceLeakTest::testJmx);
         if (forceCollection)
         {
             System.runFinalization();
@@ -264,7 +269,10 @@ public class ResourceLeakTest extends TestBaseImpl
     @Test
     public void looperEverythingTest() throws Throwable
     {
-        doTest(1, config -> config.with(Feature.values()));
+        doTest(2, config -> config.with(Feature.values()),
+               cluster -> {
+                   testJmx(cluster);
+               });
         if (forceCollection)
         {
             System.runFinalization();
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXFeatureTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXFeatureTest.java
index 1c38bd11e9..e227a24f8e 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXFeatureTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXFeatureTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.distributed.test.jmx;
 
-import java.io.IOException;
 import java.util.HashSet;
 import java.util.Set;
 import javax.management.MBeanServerConnection;
@@ -31,13 +30,20 @@ import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
 import org.apache.cassandra.distributed.shared.JMXUtil;
 import org.apache.cassandra.distributed.test.TestBaseImpl;
 
+import static 
org.apache.cassandra.distributed.test.jmx.JMXGetterCheckTest.testAllValidGetters;
+import static org.hamcrest.Matchers.blankOrNullString;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.startsWith;
 
 public class JMXFeatureTest extends TestBaseImpl
 {
+
     /**
      * Test the in-jvm dtest JMX feature.
      * - Create a cluster with multiple JMX servers, one per instance
@@ -47,7 +53,7 @@ public class JMXFeatureTest extends TestBaseImpl
      * ports in addition to IP/Host for binding, but this version does not 
support that feature. Keeping the test name the same
      * so that it's consistent across versions.
      *
-     * @throws Exception
+     * @throws Exception it's a test that calls JMX endpoints - lots of 
different Jmx exceptions are possible
      */
     @Test
     public void testMultipleNetworkInterfacesProvisioning() throws Exception
@@ -59,18 +65,57 @@ public class JMXFeatureTest extends TestBaseImpl
             try (Cluster cluster = Cluster.build(2).withConfig(c -> 
c.with(Feature.values())).start())
             {
                 Set<String> instancesContacted = new HashSet<>();
-                for (IInvokableInstance instance : cluster.get(1, 2))
+                for (IInvokableInstance instance : cluster)
                 {
                     testInstance(instancesContacted, instance);
                 }
                 Assert.assertEquals("Should have connected with both JMX 
instances.", 2, instancesContacted.size());
                 allInstances.addAll(instancesContacted);
+                // Make sure we actually exercise the mbeans by testing a 
bunch of getters.
+                // Without this it's possible for the test to pass as we don't 
touch any mBeans that we register.
+                testAllValidGetters(cluster);
             }
         }
         Assert.assertEquals("Each instance from each cluster should have been 
unique", iterations * 2, allInstances.size());
     }
 
-    private void testInstance(Set<String> instancesContacted, 
IInvokableInstance instance) throws IOException
+    @Test
+    public void testShutDownAndRestartInstances() throws Exception
+    {
+        HashSet<String> instances = new HashSet<>();
+        try (Cluster cluster = Cluster.build(2).withConfig(c -> 
c.with(Feature.values())).start())
+        {
+            IInvokableInstance instanceToStop = cluster.get(1);
+            IInvokableInstance otherInstance = cluster.get(2);
+            testInstance(instances, cluster.get(1));
+            testAllValidGetters(cluster);
+            ClusterUtils.stopUnchecked(instanceToStop);
+            // NOTE: This would previously fail because we cleared everything 
from the TCPEndpoint map in IsolatedJmx.
+            // Now, we only clear the endpoints related to that instance, 
which prevents this code from
+            // breaking with a `java.net.BindException: Address already in use 
(Bind failed)`
+            ClusterUtils.awaitRingStatus(otherInstance, instanceToStop, 
"Down");
+            NodeToolResult statusResult = 
cluster.get(2).nodetoolResult("status");
+            Assert.assertEquals(0, statusResult.getRc());
+            Assert.assertThat(statusResult.getStderr(), 
is(blankOrNullString()));
+            Assert.assertThat(statusResult.getStdout(), containsString("DN  
127.0.0.1"));
+            testInstance(instances, cluster.get(2));
+            ClusterUtils.start(instanceToStop, props -> {});
+            ClusterUtils.awaitRingState(otherInstance, instanceToStop, 
"Normal");
+            ClusterUtils.awaitRingStatus(otherInstance, instanceToStop, "Up");
+            statusResult = cluster.get(1).nodetoolResult("status");
+            Assert.assertEquals(0, statusResult.getRc());
+            Assert.assertThat(statusResult.getStderr(), 
is(blankOrNullString()));
+            Assert.assertThat(statusResult.getStdout(), containsString("UN  
127.0.0.1"));
+            statusResult = cluster.get(2).nodetoolResult("status");
+            Assert.assertEquals(0, statusResult.getRc());
+            Assert.assertThat(statusResult.getStderr(), 
is(blankOrNullString()));
+            Assert.assertThat(statusResult.getStdout(), containsString("UN  
127.0.0.1"));
+            testInstance(instances, cluster.get(1));
+            testAllValidGetters(cluster);
+        }
+    }
+
+    private void testInstance(Set<String> instancesContacted, 
IInvokableInstance instance)
     {
         IInstanceConfig config = instance.config();
         try (JMXConnector jmxc = JMXUtil.getJmxConnector(config))
@@ -82,5 +127,9 @@ public class JMXFeatureTest extends TestBaseImpl
             instancesContacted.add(defaultDomain);
             Assert.assertThat(defaultDomain, 
startsWith(JMXUtil.getJmxHost(config) + ":" + config.jmxPort()));
         }
+        catch (Throwable t)
+        {
+            throw new RuntimeException("Could not connect to JMX", t);
+        }
     }
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXGetterCheckTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXGetterCheckTest.java
index a3f74df60d..bbcfa52ebc 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXGetterCheckTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXGetterCheckTest.java
@@ -29,44 +29,68 @@ import javax.management.MBeanOperationInfo;
 import javax.management.MBeanServerConnection;
 import javax.management.ObjectName;
 import javax.management.remote.JMXConnector;
-import javax.management.remote.JMXConnectorFactory;
-import javax.management.remote.JMXServiceURL;
 
 import com.google.common.collect.ImmutableSet;
 import org.junit.Test;
 
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.shared.JMXUtil;
 import org.apache.cassandra.distributed.test.TestBaseImpl;
 
 public class JMXGetterCheckTest extends TestBaseImpl
 {
     private static final Set<String> IGNORE_ATTRIBUTES = ImmutableSet.of(
-    "org.apache.cassandra.net:type=MessagingService:BackPressurePerHost" // 
throws unsupported saying the feature was removed... dropped in CASSANDRA-15375
+    "org.apache.cassandra.net:type=MessagingService:BackPressurePerHost", // 
throws unsupported saying the feature was removed... dropped in CASSANDRA-15375
+    "org.apache.cassandra.db:type=StorageService:MaxNativeProtocolVersion" // 
Throws NPE
     );
     private static final Set<String> IGNORE_OPERATIONS = ImmutableSet.of(
     "org.apache.cassandra.db:type=StorageService:stopDaemon", // halts the 
instance, which then causes the JVM to exit
     "org.apache.cassandra.db:type=StorageService:drain", // don't drain, it 
stops things which can cause other APIs to be unstable as we are in a stopped 
state
     "org.apache.cassandra.db:type=StorageService:stopGossiping", // if we stop 
gossip this can cause other issues, so avoid
     "org.apache.cassandra.db:type=StorageService:resetLocalSchema", // this 
will fail when there are no other nodes which can serve schema
-    
"org.apache.cassandra.db:type=HintedHandoffManager:listEndpointsPendingHints", 
// this will fail because it only exists to match an old, deprecated mbean and 
just throws an UnsportedOperationException
-    "org.apache.cassandra.db:type=StorageService:decommission" // Don't 
decommission nodes! Note that in future versions of C* this is unnecessary 
because decommission takes an argument.
+    "org.apache.cassandra.db:type=StorageService:joinRing", // Causes 
bootstrapping errors
+    "org.apache.cassandra.db:type=StorageService:clearConnectionHistory", // 
Throws a NullPointerException
+    "org.apache.cassandra.db:type=StorageService:startGossiping", // causes 
multiple loops to fail
+    "org.apache.cassandra.db:type=StorageService:startNativeTransport", // 
causes multiple loops to fail
+    
"org.apache.cassandra.db:type=Tables,keyspace=system,table=local:loadNewSSTables",
 // Shouldn't attempt to load SSTables as sometimes the temp directories don't 
work
+    
"org.apache.cassandra.db:type=HintedHandoffManager:listEndpointsPendingHints", 
// hard-coded to throw UnsupportedOperationException
+    "org.apache.cassandra.db:type=StorageService:startRPCServer", // In looped 
tests this can sometimes fail
+    "org.apache.cassandra.db:type=StorageService:decommission" // shutting 
down the node and decommissioning it is bad - in later versions, it takes a 
parameter and wouldn't be run in this test
     );
 
-    public static final String JMX_SERVICE_URL_FMT = 
"service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi";
-
     @Test
     public void testGetters() throws Exception
     {
-        try (Cluster cluster = Cluster.build(1).withConfig(c -> 
c.with(Feature.values())).start())
+        for (int i=0; i < 2; i++)
         {
-            IInvokableInstance instance = cluster.get(1);
+            try (Cluster cluster = Cluster.build(1).withConfig(c -> 
c.with(Feature.values())).start())
+            {
+                testAllValidGetters(cluster);
+            }
+        }
+    }
 
-            String jmxHost = 
instance.config().broadcastAddress().getAddress().getHostAddress();
-            String url = String.format(JMX_SERVICE_URL_FMT, jmxHost, 
instance.config().jmxPort());
+    /**
+     * Tests JMX getters and operations.
+     * Useful for more than just testing getters, it also is used in 
JMXFeatureTest
+     * to make sure we've touched the complete JMX code path.
+     * @param cluster the cluster to test
+     * @throws Exception several kinds of exceptions can be thrown, mostly 
from JMX infrastructure issues.
+     */
+    public static void testAllValidGetters(Cluster cluster) throws Exception
+    {
+        for (IInvokableInstance instance: cluster)
+        {
+            if (instance.isShutdown())
+            {
+                continue;
+            }
+            IInstanceConfig config = instance.config();
             List<Named> errors = new ArrayList<>();
-            try (JMXConnector jmxc = JMXConnectorFactory.connect(new 
JMXServiceURL(url), null))
+            try (JMXConnector jmxc = JMXUtil.getJmxConnector(config))
             {
                 MBeanServerConnection mbsc = jmxc.getMBeanServerConnection();
                 Set<ObjectName> metricNames = new 
TreeSet<>(mbsc.queryNames(null, null));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to