This is an automated email from the ASF dual-hosted git repository.
ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5471b66 CASSANDRASC-44 Refactor health check to use vertx timer
5471b66 is described below
commit 5471b66c1e69e057bbbe75e4ffe67c1891cd9495
Author: Francisco Guerrero <[email protected]>
AuthorDate: Mon Oct 10 14:35:12 2022 -0700
CASSANDRASC-44 Refactor health check to use vertx timer
Vertx API offers a periodic timer that integrates with it's internal thead
pooling
mechanism. In this commit, we utilize vertx's periodic timer in favor of
using a
`Executors.newSingleThreadScheduledExecutor()` on each delegate.
Another benefit of this approach is that if the cluster topology changes,
i.e.
node replacement, cluster expansion / shrink, then the health checks will
be performed
against the actual nodes in the cluster, assuming we receive an updated
view of the
cluster when invoking the `Configuration#getInstancesConfig()#instances()`
method.
We no longer need to worry about decommissioning the single thread
executors running
on each delegate.
patch by Francisco Guerrero; reviewed by Yifan Cai, Dinesh Joshi for
CASSANDRASC-44
---
.../sidecar/common/CassandraAdapterDelegate.java | 111 ++++++++++-----------
.../cassandra/sidecar/CassandraSidecarDaemon.java | 24 ++++-
.../cluster/instance/InstanceMetadataImpl.java | 2 +-
.../org/apache/cassandra/sidecar/TestModule.java | 2 -
4 files changed, 76 insertions(+), 63 deletions(-)
diff --git
a/common/src/main/java/org/apache/cassandra/sidecar/common/CassandraAdapterDelegate.java
b/common/src/main/java/org/apache/cassandra/sidecar/common/CassandraAdapterDelegate.java
index 21a9c8a..ee3b0ce 100644
---
a/common/src/main/java/org/apache/cassandra/sidecar/common/CassandraAdapterDelegate.java
+++
b/common/src/main/java/org/apache/cassandra/sidecar/common/CassandraAdapterDelegate.java
@@ -19,10 +19,7 @@
package org.apache.cassandra.sidecar.common;
import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,87 +36,67 @@ import org.jetbrains.annotations.NotNull;
* of the underlying Cassandra adapter. If a server reboots, we can swap out
the right Adapter when the driver
* reconnects.
*
- * This delegate *MUST* checkSession() before every call, because:
- *
- * 1. The session lazily connects
- * 2. We might need to swap out the adapter if the version has changed
+ * <p>This delegate <b>MUST</b> invoke {@link #checkSession()} before every
call, because:</p>
*
+ * <ol>
+ * <li>The session lazily connects</li>
+ * <li>We might need to swap out the adapter if the version has changed</li>
+ * </ol>
*/
public class CassandraAdapterDelegate implements ICassandraAdapter,
Host.StateListener
{
private final CQLSession cqlSession;
private final CassandraVersionProvider versionProvider;
- private Session session;
+ private volatile Session session;
private SimpleCassandraVersion currentVersion;
private ICassandraAdapter adapter;
private volatile boolean isUp = false;
- private final int refreshRate;
private static final Logger logger =
LoggerFactory.getLogger(CassandraAdapterDelegate.class);
- private final ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor();
- private ScheduledFuture<?> healthCheckRoutine;
- private boolean registered = false;
+ private final AtomicBoolean registered = new AtomicBoolean(false);
+ private final AtomicBoolean isHealthCheckActive = new AtomicBoolean(false);
public CassandraAdapterDelegate(CassandraVersionProvider provider,
CQLSession cqlSession)
- {
- this(provider, cqlSession, 5000);
- }
-
- public CassandraAdapterDelegate(CassandraVersionProvider provider,
CQLSession cqlSession, int refreshRate)
{
this.cqlSession = cqlSession;
this.versionProvider = provider;
- this.refreshRate = refreshRate;
}
- public synchronized void start()
+ private void maybeRegisterHostListener(@NotNull Session session)
{
- logger.info("Starting health check");
- // only schedule the health check once.
- if (healthCheckRoutine == null)
- {
- healthCheckRoutine =
executor.scheduleWithFixedDelay(this::healthCheck,
- 0,
- refreshRate,
-
TimeUnit.MILLISECONDS);
- }
- }
-
- private synchronized void maybeRegisterHostListener(@NotNull Session
session)
- {
- if (!registered)
+ if (registered.compareAndSet(false, true))
{
session.getCluster().register(this);
- registered = true;
}
}
- private synchronized void maybeUnregisterHostListener(@NotNull Session
session)
+ private void maybeUnregisterHostListener(@NotNull Session session)
{
- if (registered)
+ if (registered.compareAndSet(true, false))
{
session.getCluster().unregister(this);
- registered = false;
}
}
- public synchronized void stop()
- {
- logger.info("Stopping health check");
- executor.shutdown();
- }
-
/**
* Make an attempt to obtain the session object.
*
- * It needs to be called before routing the request to the adapter
- * We might end up swapping the adapter out because of a server upgrade
+ * <p>It needs to be called before routing the request to the adapter
+ * We might end up swapping the adapter out because of a server upgrade</p>
*/
- public synchronized void checkSession()
+ public void checkSession()
{
- if (session == null)
+ if (session != null)
{
- session = cqlSession.getLocalCql();
+ return;
+ }
+
+ synchronized (this)
+ {
+ if (session == null)
+ {
+ session = cqlSession.getLocalCql();
+ }
}
}
@@ -127,26 +104,46 @@ public class CassandraAdapterDelegate implements
ICassandraAdapter, Host.StateLi
* Should be called on initial connect as well as when a server comes back
since it might be from an upgrade
* synchronized so we don't flood the DB with version requests
*
- * If the healthcheck determines we've changed versions, it should load
the proper adapter
+ * <p>If the healthcheck determines we've changed versions, it should load
the proper adapter</p>
*/
- public synchronized void healthCheck()
+ public void healthCheck()
+ {
+ if (isHealthCheckActive.compareAndSet(false, true))
+ {
+ try
+ {
+ healthCheckInternal();
+ }
+ finally
+ {
+ isHealthCheckActive.set(false);
+ }
+ }
+ else
+ {
+ logger.debug("Skipping health check because there's an active
check at the moment");
+ }
+ }
+
+ private void healthCheckInternal()
{
checkSession();
- if (session == null)
+ Session activeSession = session;
+ if (activeSession == null)
{
logger.info("No local CQL session is available. Cassandra is down
presumably.");
isUp = false;
return;
}
- maybeRegisterHostListener(session);
+ maybeRegisterHostListener(activeSession);
try
{
- String version = session.execute("select release_version from
system.local")
- .one()
- .getString("release_version");
+ String version = activeSession.execute("select release_version
from system.local")
+ .one()
+ .getString("release_version");
isUp = true;
// this might swap the adapter out
SimpleCassandraVersion newVersion =
SimpleCassandraVersion.create(version);
@@ -164,7 +161,7 @@ public class CassandraAdapterDelegate implements
ICassandraAdapter, Host.StateLi
// The cassandra node is down.
// Unregister the host listener and nullify the session in order
to get a new object.
isUp = false;
- maybeUnregisterHostListener(session);
+ maybeUnregisterHostListener(activeSession);
session = null;
}
}
diff --git
a/src/main/java/org/apache/cassandra/sidecar/CassandraSidecarDaemon.java
b/src/main/java/org/apache/cassandra/sidecar/CassandraSidecarDaemon.java
index 3355d5d..25924fb 100644
--- a/src/main/java/org/apache/cassandra/sidecar/CassandraSidecarDaemon.java
+++ b/src/main/java/org/apache/cassandra/sidecar/CassandraSidecarDaemon.java
@@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Singleton;
+import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServer;
import org.apache.cassandra.sidecar.utils.SslUtils;
@@ -37,12 +38,15 @@ import org.apache.cassandra.sidecar.utils.SslUtils;
public class CassandraSidecarDaemon
{
private static final Logger logger =
LoggerFactory.getLogger(CassandraSidecarDaemon.class);
+ private final Vertx vertx;
private final HttpServer server;
private final Configuration config;
+ private long healthCheckTimerId;
@Inject
- public CassandraSidecarDaemon(HttpServer server, Configuration config)
+ public CassandraSidecarDaemon(Vertx vertx, HttpServer server,
Configuration config)
{
+ this.vertx = vertx;
this.server = server;
this.config = config;
}
@@ -53,14 +57,14 @@ public class CassandraSidecarDaemon
validate();
logger.info("Starting Cassandra Sidecar on {}:{}", config.getHost(),
config.getPort());
server.listen(config.getPort(), config.getHost());
- this.config.getInstancesConfig().instances().forEach(instanceMetadata
-> instanceMetadata.delegate().start());
+ healthCheckTimerId =
vertx.setPeriodic(config.getHealthCheckFrequencyMillis(), this::healthCheck);
}
public void stop()
{
logger.info("Stopping Cassandra Sidecar");
server.close();
- this.config.getInstancesConfig().instances().forEach(instanceMetadata
-> instanceMetadata.delegate().stop());
+ vertx.cancelTimer(healthCheckTimerId);
}
private void banner(PrintStream out)
@@ -97,6 +101,20 @@ public class CassandraSidecarDaemon
}
+ /**
+ * Checks the health of every instance configured in the {@link
Configuration#getInstancesConfig()}.
+ * The health check is executed in a blocking thread to prevent the
event-loop threads from blocking.
+ *
+ * @param timerId the ID of the periodic timer
+ */
+ private void healthCheck(Long timerId)
+ {
+ config.getInstancesConfig()
+ .instances()
+ .forEach(instanceMetadata ->
+ vertx.executeBlocking(promise ->
instanceMetadata.delegate().healthCheck()));
+ }
+
public static void main(String[] args)
{
diff --git
a/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadataImpl.java
b/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadataImpl.java
index 7e7d31b..f63d613 100644
---
a/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadataImpl.java
+++
b/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadataImpl.java
@@ -45,7 +45,7 @@ public class InstanceMetadataImpl implements InstanceMetadata
this.dataDirs = dataDirs;
this.session = new CQLSession(host, port, healthCheckFrequencyMillis);
- this.delegate = new CassandraAdapterDelegate(versionProvider, session,
healthCheckFrequencyMillis);
+ this.delegate = new CassandraAdapterDelegate(versionProvider, session);
}
public int id()
diff --git a/src/test/java/org/apache/cassandra/sidecar/TestModule.java
b/src/test/java/org/apache/cassandra/sidecar/TestModule.java
index 5006909..1a6393a 100644
--- a/src/test/java/org/apache/cassandra/sidecar/TestModule.java
+++ b/src/test/java/org/apache/cassandra/sidecar/TestModule.java
@@ -39,7 +39,6 @@ import
org.apache.cassandra.sidecar.common.MockCassandraFactory;
import org.apache.cassandra.sidecar.common.TestValidationConfiguration;
import org.apache.cassandra.sidecar.common.utils.ValidationConfiguration;
-import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -107,7 +106,6 @@ public class TestModule extends AbstractModule
CassandraAdapterDelegate delegate =
mock(CassandraAdapterDelegate.class);
when(delegate.isUp()).thenReturn(isUp);
- doNothing().when(delegate).start();
when(instanceMeta.delegate()).thenReturn(delegate);
return instanceMeta;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]