This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5471b66  CASSANDRASC-44 Refactor health check to use vertx timer
5471b66 is described below

commit 5471b66c1e69e057bbbe75e4ffe67c1891cd9495
Author: Francisco Guerrero <[email protected]>
AuthorDate: Mon Oct 10 14:35:12 2022 -0700

    CASSANDRASC-44 Refactor health check to use vertx timer
    
    Vertx API offers a periodic timer that integrates with it's internal thead 
pooling
    mechanism. In this commit, we utilize vertx's periodic timer in favor of 
using a
    `Executors.newSingleThreadScheduledExecutor()` on each delegate.
    
    Another benefit of this approach is that if the cluster topology changes, 
i.e.
    node replacement, cluster expansion / shrink, then the health checks will 
be performed
    against the actual nodes in the cluster, assuming we receive an updated 
view of the
    cluster when invoking the `Configuration#getInstancesConfig()#instances()` 
method.
    We no longer need to worry about decommissioning the single thread 
executors running
    on each delegate.
    
    patch by Francisco Guerrero; reviewed by Yifan Cai, Dinesh Joshi for 
CASSANDRASC-44
---
 .../sidecar/common/CassandraAdapterDelegate.java   | 111 ++++++++++-----------
 .../cassandra/sidecar/CassandraSidecarDaemon.java  |  24 ++++-
 .../cluster/instance/InstanceMetadataImpl.java     |   2 +-
 .../org/apache/cassandra/sidecar/TestModule.java   |   2 -
 4 files changed, 76 insertions(+), 63 deletions(-)

diff --git 
a/common/src/main/java/org/apache/cassandra/sidecar/common/CassandraAdapterDelegate.java
 
b/common/src/main/java/org/apache/cassandra/sidecar/common/CassandraAdapterDelegate.java
index 21a9c8a..ee3b0ce 100644
--- 
a/common/src/main/java/org/apache/cassandra/sidecar/common/CassandraAdapterDelegate.java
+++ 
b/common/src/main/java/org/apache/cassandra/sidecar/common/CassandraAdapterDelegate.java
@@ -19,10 +19,7 @@
 package org.apache.cassandra.sidecar.common;
 
 import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,87 +36,67 @@ import org.jetbrains.annotations.NotNull;
  * of the underlying Cassandra adapter.  If a server reboots, we can swap out 
the right Adapter when the driver
  * reconnects.
  *
- * This delegate *MUST* checkSession() before every call, because:
- *
- * 1. The session lazily connects
- * 2. We might need to swap out the adapter if the version has changed
+ * <p>This delegate <b>MUST</b> invoke {@link #checkSession()} before every 
call, because:</p>
  *
+ * <ol>
+ * <li>The session lazily connects</li>
+ * <li>We might need to swap out the adapter if the version has changed</li>
+ * </ol>
  */
 public class CassandraAdapterDelegate implements ICassandraAdapter, 
Host.StateListener
 {
     private final CQLSession cqlSession;
     private final CassandraVersionProvider versionProvider;
-    private Session session;
+    private volatile Session session;
     private SimpleCassandraVersion currentVersion;
     private ICassandraAdapter adapter;
     private volatile boolean isUp = false;
-    private final int refreshRate;
 
     private static final Logger logger = 
LoggerFactory.getLogger(CassandraAdapterDelegate.class);
-    private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
-    private ScheduledFuture<?> healthCheckRoutine;
-    private boolean registered = false;
+    private final AtomicBoolean registered = new AtomicBoolean(false);
+    private final AtomicBoolean isHealthCheckActive = new AtomicBoolean(false);
 
     public CassandraAdapterDelegate(CassandraVersionProvider provider, 
CQLSession cqlSession)
-    {
-        this(provider, cqlSession, 5000);
-    }
-
-    public CassandraAdapterDelegate(CassandraVersionProvider provider, 
CQLSession cqlSession, int refreshRate)
     {
         this.cqlSession = cqlSession;
         this.versionProvider = provider;
-        this.refreshRate = refreshRate;
     }
 
-    public synchronized void start()
+    private void maybeRegisterHostListener(@NotNull Session session)
     {
-        logger.info("Starting health check");
-        // only schedule the health check once.
-        if (healthCheckRoutine == null)
-        {
-            healthCheckRoutine = 
executor.scheduleWithFixedDelay(this::healthCheck,
-                                                                 0,
-                                                                 refreshRate,
-                                                                 
TimeUnit.MILLISECONDS);
-        }
-    }
-
-    private synchronized void maybeRegisterHostListener(@NotNull Session 
session)
-    {
-        if (!registered)
+        if (registered.compareAndSet(false, true))
         {
             session.getCluster().register(this);
-            registered = true;
         }
     }
 
-    private synchronized void maybeUnregisterHostListener(@NotNull Session 
session)
+    private void maybeUnregisterHostListener(@NotNull Session session)
     {
-        if (registered)
+        if (registered.compareAndSet(true, false))
         {
             session.getCluster().unregister(this);
-            registered = false;
         }
     }
 
-    public synchronized void stop()
-    {
-        logger.info("Stopping health check");
-        executor.shutdown();
-    }
-
     /**
      * Make an attempt to obtain the session object.
      *
-     * It needs to be called before routing the request to the adapter
-     * We might end up swapping the adapter out because of a server upgrade
+     * <p>It needs to be called before routing the request to the adapter
+     * We might end up swapping the adapter out because of a server upgrade</p>
      */
-    public synchronized void checkSession()
+    public void checkSession()
     {
-        if (session == null)
+        if (session != null)
         {
-            session = cqlSession.getLocalCql();
+            return;
+        }
+
+        synchronized (this)
+        {
+            if (session == null)
+            {
+                session = cqlSession.getLocalCql();
+            }
         }
     }
 
@@ -127,26 +104,46 @@ public class CassandraAdapterDelegate implements 
ICassandraAdapter, Host.StateLi
      * Should be called on initial connect as well as when a server comes back 
since it might be from an upgrade
      * synchronized so we don't flood the DB with version requests
      *
-     * If the healthcheck determines we've changed versions, it should load 
the proper adapter
+     * <p>If the healthcheck determines we've changed versions, it should load 
the proper adapter</p>
      */
-    public synchronized void healthCheck()
+    public void healthCheck()
+    {
+        if (isHealthCheckActive.compareAndSet(false, true))
+        {
+            try
+            {
+                healthCheckInternal();
+            }
+            finally
+            {
+                isHealthCheckActive.set(false);
+            }
+        }
+        else
+        {
+            logger.debug("Skipping health check because there's an active 
check at the moment");
+        }
+    }
+
+    private void healthCheckInternal()
     {
         checkSession();
 
-        if (session == null)
+        Session activeSession = session;
+        if (activeSession == null)
         {
             logger.info("No local CQL session is available. Cassandra is down 
presumably.");
             isUp = false;
             return;
         }
 
-        maybeRegisterHostListener(session);
+        maybeRegisterHostListener(activeSession);
 
         try
         {
-            String version = session.execute("select release_version from 
system.local")
-                    .one()
-                    .getString("release_version");
+            String version = activeSession.execute("select release_version 
from system.local")
+                                          .one()
+                                          .getString("release_version");
             isUp = true;
             // this might swap the adapter out
             SimpleCassandraVersion newVersion = 
SimpleCassandraVersion.create(version);
@@ -164,7 +161,7 @@ public class CassandraAdapterDelegate implements 
ICassandraAdapter, Host.StateLi
             // The cassandra node is down.
             // Unregister the host listener and nullify the session in order 
to get a new object.
             isUp = false;
-            maybeUnregisterHostListener(session);
+            maybeUnregisterHostListener(activeSession);
             session = null;
         }
     }
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/CassandraSidecarDaemon.java 
b/src/main/java/org/apache/cassandra/sidecar/CassandraSidecarDaemon.java
index 3355d5d..25924fb 100644
--- a/src/main/java/org/apache/cassandra/sidecar/CassandraSidecarDaemon.java
+++ b/src/main/java/org/apache/cassandra/sidecar/CassandraSidecarDaemon.java
@@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory;
 import com.google.inject.Guice;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import io.vertx.core.Vertx;
 import io.vertx.core.http.HttpServer;
 import org.apache.cassandra.sidecar.utils.SslUtils;
 
@@ -37,12 +38,15 @@ import org.apache.cassandra.sidecar.utils.SslUtils;
 public class CassandraSidecarDaemon
 {
     private static final Logger logger = 
LoggerFactory.getLogger(CassandraSidecarDaemon.class);
+    private final Vertx vertx;
     private final HttpServer server;
     private final Configuration config;
+    private long healthCheckTimerId;
 
     @Inject
-    public CassandraSidecarDaemon(HttpServer server, Configuration config)
+    public CassandraSidecarDaemon(Vertx vertx, HttpServer server, 
Configuration config)
     {
+        this.vertx = vertx;
         this.server = server;
         this.config = config;
     }
@@ -53,14 +57,14 @@ public class CassandraSidecarDaemon
         validate();
         logger.info("Starting Cassandra Sidecar on {}:{}", config.getHost(), 
config.getPort());
         server.listen(config.getPort(), config.getHost());
-        this.config.getInstancesConfig().instances().forEach(instanceMetadata 
-> instanceMetadata.delegate().start());
+        healthCheckTimerId = 
vertx.setPeriodic(config.getHealthCheckFrequencyMillis(), this::healthCheck);
     }
 
     public void stop()
     {
         logger.info("Stopping Cassandra Sidecar");
         server.close();
-        this.config.getInstancesConfig().instances().forEach(instanceMetadata 
-> instanceMetadata.delegate().stop());
+        vertx.cancelTimer(healthCheckTimerId);
     }
 
     private void banner(PrintStream out)
@@ -97,6 +101,20 @@ public class CassandraSidecarDaemon
 
     }
 
+    /**
+     * Checks the health of every instance configured in the {@link 
Configuration#getInstancesConfig()}.
+     * The health check is executed in a blocking thread to prevent the 
event-loop threads from blocking.
+     *
+     * @param timerId the ID of the periodic timer
+     */
+    private void healthCheck(Long timerId)
+    {
+        config.getInstancesConfig()
+              .instances()
+              .forEach(instanceMetadata ->
+                       vertx.executeBlocking(promise -> 
instanceMetadata.delegate().healthCheck()));
+    }
+
 
     public static void main(String[] args)
     {
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadataImpl.java
 
b/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadataImpl.java
index 7e7d31b..f63d613 100644
--- 
a/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadataImpl.java
+++ 
b/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadataImpl.java
@@ -45,7 +45,7 @@ public class InstanceMetadataImpl implements InstanceMetadata
         this.dataDirs = dataDirs;
 
         this.session = new CQLSession(host, port, healthCheckFrequencyMillis);
-        this.delegate = new CassandraAdapterDelegate(versionProvider, session, 
healthCheckFrequencyMillis);
+        this.delegate = new CassandraAdapterDelegate(versionProvider, session);
     }
 
     public int id()
diff --git a/src/test/java/org/apache/cassandra/sidecar/TestModule.java 
b/src/test/java/org/apache/cassandra/sidecar/TestModule.java
index 5006909..1a6393a 100644
--- a/src/test/java/org/apache/cassandra/sidecar/TestModule.java
+++ b/src/test/java/org/apache/cassandra/sidecar/TestModule.java
@@ -39,7 +39,6 @@ import 
org.apache.cassandra.sidecar.common.MockCassandraFactory;
 import org.apache.cassandra.sidecar.common.TestValidationConfiguration;
 import org.apache.cassandra.sidecar.common.utils.ValidationConfiguration;
 
-import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -107,7 +106,6 @@ public class TestModule extends AbstractModule
 
         CassandraAdapterDelegate delegate = 
mock(CassandraAdapterDelegate.class);
         when(delegate.isUp()).thenReturn(isUp);
-        doNothing().when(delegate).start();
         when(instanceMeta.delegate()).thenReturn(delegate);
         return instanceMeta;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to