This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 36d48b2  CASSANDRASC-80: HealthCheckPeriodicTask execute never 
completes the promise when instances are empty
36d48b2 is described below

commit 36d48b209c84470930d5ce3d4f4ab1c406dae60e
Author: Francisco Guerrero <[email protected]>
AuthorDate: Fri Oct 27 09:38:11 2023 -0700

    CASSANDRASC-80: HealthCheckPeriodicTask execute never completes the promise 
when instances are empty
    
    When the HealthCheckPeriodicTask executes, and the instances are null or 
empty, the promise never completes.
    This prevents subsequent scheduled health checks to take place because the 
PeriodicTaskExecutor will only
    schedule the new task only if no other tasks are active. This makes the 
HealthCheckPeriodicTask to never
    perform health checks when this condition is encountered.
    
    In this commit, we fix the issue by completing the promise when this 
condition is encountered.
    
    Patch by Francisco Guerrero; Reviewed by Dinesh Joshi, Yifan Cai for 
CASSANDRASC-80
---
 CHANGES.txt                                        |  1 +
 build.gradle                                       |  2 +
 .../cassandra/sidecar/cluster/InstancesConfig.java |  5 +++
 .../sidecar/cluster/InstancesConfigImpl.java       |  3 +-
 .../sidecar/tasks/HealthCheckPeriodicTask.java     | 47 +++++++++++-----------
 .../cassandra/sidecar/tasks/PeriodicTask.java      |  4 ++
 .../cassandra/sidecar/IntegrationTestModule.java   |  2 +
 .../sidecar/tasks/HealthCheckPeriodicTaskTest.java | 13 +++++-
 8 files changed, 51 insertions(+), 26 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index c87d1ad..256b4bd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 1.0.0
 -----
+ * HealthCheckPeriodicTask execute never completes the promise when instances 
are empty (CASSANDRASC-80)
  * Fix token-ranges endpoint to handle gossip-info responses without 'status' 
(CASSANDRASC-78)
  * Upgrade vertx to version 4.4.6 to bring hot reloading and traffic shaping 
options (CASSANDRASC-77)
  * Fix unable to stream secondary index files (CASSANDRASC-74)
diff --git a/build.gradle b/build.gradle
index 3303d6f..d733494 100644
--- a/build.gradle
+++ b/build.gradle
@@ -169,6 +169,8 @@ configurations {
 
 dependencies {
     compileOnly('org.jetbrains:annotations:23.0.0')
+    testCompileOnly('org.jetbrains:annotations:23.0.0')
+    integrationTestCompileOnly('org.jetbrains:annotations:23.0.0')
 
     implementation("io.vertx:vertx-web:${project.vertxVersion}") {
         exclude group: 'junit', module: 'junit'
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/cluster/InstancesConfig.java 
b/src/main/java/org/apache/cassandra/sidecar/cluster/InstancesConfig.java
index cf19a06..667ad90 100644
--- a/src/main/java/org/apache/cassandra/sidecar/cluster/InstancesConfig.java
+++ b/src/main/java/org/apache/cassandra/sidecar/cluster/InstancesConfig.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.NoSuchElementException;
 
 import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Maintains metadata of instances maintained by Sidecar.
@@ -29,8 +30,12 @@ import 
org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
 public interface InstancesConfig
 {
     /**
+     * Returns metadata associated with the Cassandra instances managed by 
this Sidecar. The implementer
+     * must return a non-null value. When no Cassandra instances are 
configured, an empty list can be returned.
+     *
      * @return metadata of instances owned by the sidecar
      */
+    @NotNull
     List<InstanceMetadata> instances();
 
     /**
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/cluster/InstancesConfigImpl.java 
b/src/main/java/org/apache/cassandra/sidecar/cluster/InstancesConfigImpl.java
index 2724cce..dfd830f 100644
--- 
a/src/main/java/org/apache/cassandra/sidecar/cluster/InstancesConfigImpl.java
+++ 
b/src/main/java/org/apache/cassandra/sidecar/cluster/InstancesConfigImpl.java
@@ -28,6 +28,7 @@ import java.util.stream.Collectors;
 
 import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
 import org.apache.cassandra.sidecar.common.dns.DnsResolver;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Local implementation of InstancesConfig.
@@ -57,7 +58,7 @@ public class InstancesConfigImpl implements InstancesConfig
     }
 
     @Override
-    public List<InstanceMetadata> instances()
+    public @NotNull List<InstanceMetadata> instances()
     {
         return instanceMetadataList;
     }
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/tasks/HealthCheckPeriodicTask.java 
b/src/main/java/org/apache/cassandra/sidecar/tasks/HealthCheckPeriodicTask.java
index 84cf07a..28eefab 100644
--- 
a/src/main/java/org/apache/cassandra/sidecar/tasks/HealthCheckPeriodicTask.java
+++ 
b/src/main/java/org/apache/cassandra/sidecar/tasks/HealthCheckPeriodicTask.java
@@ -19,16 +19,16 @@
 package org.apache.cassandra.sidecar.tasks;
 
 import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.vertx.core.Future;
 import io.vertx.core.Promise;
 import io.vertx.core.Vertx;
 import io.vertx.core.eventbus.EventBus;
 import org.apache.cassandra.sidecar.cluster.InstancesConfig;
-import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
 import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
 import org.apache.cassandra.sidecar.config.SidecarConfiguration;
 
@@ -82,28 +82,27 @@ public class HealthCheckPeriodicTask implements PeriodicTask
     @Override
     public void execute(Promise<Void> promise)
     {
-        List<InstanceMetadata> instances = instancesConfig.instances();
-        AtomicInteger counter = new AtomicInteger(instances.size());
-        instances.forEach(instanceMetadata -> internalPool.executeBlocking(p 
-> {
-            try
-            {
-                instanceMetadata.delegate().healthCheck();
-                p.complete();
-            }
-            catch (Throwable cause)
-            {
-                p.fail(cause);
-                LOGGER.error("Unable to complete health check on instance={}",
-                             instanceMetadata.id(), cause);
-            }
-            finally
-            {
-                if (counter.decrementAndGet() == 0)
-                {
-                    promise.tryComplete();
-                }
-            }
-        }, false));
+        List<Future<?>> futures = instancesConfig.instances()
+                                                 .stream()
+                                                 .map(instanceMetadata -> 
internalPool.executeBlocking(p -> {
+                                                     try
+                                                     {
+                                                         
instanceMetadata.delegate().healthCheck();
+                                                         p.complete();
+                                                     }
+                                                     catch (Throwable cause)
+                                                     {
+                                                         p.fail(cause);
+                                                         LOGGER.error("Unable 
to complete health check on instance={}",
+                                                                      
instanceMetadata.id(), cause);
+                                                     }
+                                                 }, false))
+                                                 .collect(Collectors.toList());
+
+        // join always waits until all its futures are completed and will not 
fail as soon as one of the future fails
+        Future.join(futures)
+              .onSuccess(v -> promise.complete())
+              .onFailure(promise::fail);
     }
 
     @Override
diff --git a/src/main/java/org/apache/cassandra/sidecar/tasks/PeriodicTask.java 
b/src/main/java/org/apache/cassandra/sidecar/tasks/PeriodicTask.java
index 7021e53..4b21127 100644
--- a/src/main/java/org/apache/cassandra/sidecar/tasks/PeriodicTask.java
+++ b/src/main/java/org/apache/cassandra/sidecar/tasks/PeriodicTask.java
@@ -60,6 +60,10 @@ public interface PeriodicTask
      * Defines the task body.
      * The method can be considered as executing in a single thread.
      *
+     * <br><b>NOTE:</b> the {@code promise} must be completed (as either 
succeeded or failed) at the end of the run.
+     * Failing to do so, the {@link PeriodicTaskExecutor} will not be able to 
schedule a new run.
+     * See {@link PeriodicTaskExecutor#executeInternal} for details.
+     *
      * @param promise a promise when the execution completes
      */
     void execute(Promise<Void> promise);
diff --git 
a/src/test/integration/org/apache/cassandra/sidecar/IntegrationTestModule.java 
b/src/test/integration/org/apache/cassandra/sidecar/IntegrationTestModule.java
index 7882237..0564f61 100644
--- 
a/src/test/integration/org/apache/cassandra/sidecar/IntegrationTestModule.java
+++ 
b/src/test/integration/org/apache/cassandra/sidecar/IntegrationTestModule.java
@@ -34,6 +34,7 @@ import 
org.apache.cassandra.sidecar.config.yaml.HealthCheckConfigurationImpl;
 import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl;
 import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl;
 import org.apache.cassandra.sidecar.test.CassandraSidecarTestContext;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Provides the basic dependencies for integration tests
@@ -59,6 +60,7 @@ public class IntegrationTestModule extends AbstractModule
         /**
          * @return metadata of instances owned by the sidecar
          */
+        @NotNull
         public List<InstanceMetadata> instances()
         {
             if (cassandraTestContext != null && 
cassandraTestContext.isClusterBuilt())
diff --git 
a/src/test/java/org/apache/cassandra/sidecar/tasks/HealthCheckPeriodicTaskTest.java
 
b/src/test/java/org/apache/cassandra/sidecar/tasks/HealthCheckPeriodicTaskTest.java
index ea3a770..c4695ca 100644
--- 
a/src/test/java/org/apache/cassandra/sidecar/tasks/HealthCheckPeriodicTaskTest.java
+++ 
b/src/test/java/org/apache/cassandra/sidecar/tasks/HealthCheckPeriodicTaskTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.sidecar.tasks;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -81,6 +82,16 @@ class HealthCheckPeriodicTaskTest
         assertThat(healthCheck.name()).isEqualTo("Health Check");
     }
 
+    @Test
+    void 
testHealthCheckPromiseCompletesWhenNoInstancesAreConfigured(VertxTestContext 
context)
+    {
+        List<InstanceMetadata> mockInstanceMetadata = Collections.emptyList();
+        when(mockInstancesConfig.instances()).thenReturn(mockInstanceMetadata);
+        Promise<Void> promise = Promise.promise();
+        healthCheck.execute(promise);
+        promise.future().onComplete(context.succeedingThenComplete());
+    }
+
     @Test
     void testHealthCheckInvokedForAllInstances(VertxTestContext context)
     {
@@ -107,7 +118,7 @@ class HealthCheckPeriodicTaskTest
         when(mockInstancesConfig.instances()).thenReturn(mockInstanceMetadata);
         Promise<Void> promise = Promise.promise();
         healthCheck.execute(promise);
-        promise.future().onComplete(context.succeedingThenComplete());
+        promise.future().onComplete(context.failingThenComplete());
     }
 
     @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to