This is an automated email from the ASF dual-hosted git repository.

saranyak pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3b171b45 CASSSIDECAR-342: Sidecar endpoint for draining a node (#258)
3b171b45 is described below

commit 3b171b45f6f68742e6c2564c8ec4506aa22423c0
Author: Sudipta <[email protected]>
AuthorDate: Tue Oct 28 14:01:27 2025 -0700

    CASSSIDECAR-342: Sidecar endpoint for draining a node (#258)
    
    Patch by Sudipta Laha; reviewed by Francisco Guerrero, Arjun Ashok, Saranya 
Krishnakumar for CASSSIDECAR-342
---
 CHANGES.txt                                        |   1 +
 .../adapters/base/CassandraStorageOperations.java  |  10 +
 .../jmx/GossipDependentStorageJmxOperations.java   |   6 +
 .../adapters/base/jmx/StorageJmxOperations.java    |   5 +
 .../cassandra/sidecar/common/ApiEndpointsV1.java   |   1 +
 .../sidecar/common/request/NodeDrainRequest.java   |  46 ++++
 .../cassandra/sidecar/client/RequestContext.java   |  13 +
 .../cassandra/sidecar/client/SidecarClient.java    |  14 ++
 .../sidecar/client/SidecarClientTest.java          |  19 ++
 .../CassandraNodeOperationsIntegrationTest.java    | 163 +++++++++++++
 .../sidecar/common/server/StorageOperations.java   |   5 +
 .../acl/authorization/BasicPermissions.java        |   1 +
 .../sidecar/handlers/NodeDecommissionHandler.java  |  25 +-
 ...ommissionHandler.java => NodeDrainHandler.java} |  54 ++---
 .../sidecar/handlers/OperationalJobHandler.java    |   2 +-
 .../apache/cassandra/sidecar/job/NodeDrainJob.java | 142 +++++++++++
 .../sidecar/job/OperationalJobManager.java         |  43 +++-
 .../sidecar/modules/CassandraOperationsModule.java |  21 ++
 .../modules/multibindings/VertxRouteMapKeys.java   |   5 +
 .../sidecar/utils/OperationalJobUtils.java         |  22 +-
 .../sidecar/handlers/NodeDrainHandlerTest.java     | 243 +++++++++++++++++++
 .../cassandra/sidecar/job/NodeDrainJobTest.java    | 268 +++++++++++++++++++++
 .../sidecar/job/OperationalJobManagerTest.java     |  77 +++---
 23 files changed, 1087 insertions(+), 99 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 4decb80f..21a0bcc1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 0.3.0
 -----
+ * Sidecar endpoint for draining a node (CASSSIDECAR-342)
  * Avoid resuming stream early during SSTable upload (CASSSIDECAR-359)
  * Add cache for Authorization layer (CASSSIDECAR-357)
  * Avoid creating objects in the CassandraAdapter implementation 
(CASSSIDECAR-355)
diff --git 
a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java
 
b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java
index 7bfdfdd4..db8b14b1 100644
--- 
a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java
+++ 
b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java
@@ -236,6 +236,16 @@ public class CassandraStorageOperations implements 
StorageOperations
                  .decommission(force);
     }
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void drain() throws IOException, InterruptedException, 
ExecutionException
+    {
+        jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME)
+                 .drain();
+    }
+
     /**
      * {@inheritDoc}
      */
diff --git 
a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/GossipDependentStorageJmxOperations.java
 
b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/GossipDependentStorageJmxOperations.java
index a40255ed..fb753237 100644
--- 
a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/GossipDependentStorageJmxOperations.java
+++ 
b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/GossipDependentStorageJmxOperations.java
@@ -172,6 +172,12 @@ public class GossipDependentStorageJmxOperations 
implements StorageJmxOperations
         delegate.decommission(force);
     }
 
+    @Override
+    public void drain() throws IOException, InterruptedException, 
ExecutionException
+    {
+        delegate.drain();
+    }
+
     @Override
     public String getClusterName()
     {
diff --git 
a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/StorageJmxOperations.java
 
b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/StorageJmxOperations.java
index c76ccce6..11e838c4 100644
--- 
a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/StorageJmxOperations.java
+++ 
b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/StorageJmxOperations.java
@@ -168,6 +168,11 @@ public interface StorageJmxOperations
      */
     void decommission(boolean force) throws IllegalStateException, 
IllegalArgumentException, UnsupportedOperationException;
 
+    /**
+     * Triggers the node drain operation
+     */
+    void drain() throws IOException, InterruptedException, ExecutionException;
+
     /**
      * Fetch the operation-mode of the node
      *
diff --git 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
index d67e4cdc..923342f5 100644
--- 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
+++ 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
@@ -140,6 +140,7 @@ public final class ApiEndpointsV1
     public static final String LIST_OPERATIONAL_JOBS_ROUTE = API_V1 + 
CASSANDRA + OPERATIONAL_JOBS;
     public static final String OPERATIONAL_JOB_ROUTE = API_V1 + CASSANDRA + 
PER_OPERATIONAL_JOB;
     public static final String NODE_DECOMMISSION_ROUTE = API_V1 + CASSANDRA + 
"/operations/decommission";
+    public static final String NODE_DRAIN_ROUTE = API_V1 + CASSANDRA + 
"/operations/drain";
     public static final String STREAM_STATS_ROUTE = API_V1 + CASSANDRA + 
"/stats/streams";
     public static final String TABLE_STATS_ROUTE = API_V1 + CASSANDRA + 
PER_KEYSPACE + PER_TABLE + "/stats";
 
diff --git 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/NodeDrainRequest.java
 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/NodeDrainRequest.java
new file mode 100644
index 00000000..4ebd1818
--- /dev/null
+++ 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/NodeDrainRequest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.request;
+
+import io.netty.handler.codec.http.HttpMethod;
+import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
+import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
+
+/**
+ * Represents a request to execute node drain operation
+ */
+public class NodeDrainRequest extends JsonRequest<OperationalJobResponse>
+{
+    /**
+     * Constructs a request to execute a node drain operation
+     */
+    public NodeDrainRequest()
+    {
+        super(ApiEndpointsV1.NODE_DRAIN_ROUTE);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public HttpMethod method()
+    {
+        return HttpMethod.PUT;
+    }
+}
diff --git 
a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java 
b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
index 2e630c31..4bf13bd6 100644
--- 
a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
+++ 
b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
@@ -45,6 +45,7 @@ import 
org.apache.cassandra.sidecar.common.request.ListOperationalJobsRequest;
 import org.apache.cassandra.sidecar.common.request.ListSnapshotFilesRequest;
 import org.apache.cassandra.sidecar.common.request.NativeUpdateRequest;
 import org.apache.cassandra.sidecar.common.request.NodeDecommissionRequest;
+import org.apache.cassandra.sidecar.common.request.NodeDrainRequest;
 import org.apache.cassandra.sidecar.common.request.NodeSettingsRequest;
 import org.apache.cassandra.sidecar.common.request.OperationalJobRequest;
 import org.apache.cassandra.sidecar.common.request.ReportSchemaRequest;
@@ -89,6 +90,7 @@ public class RequestContext
     protected static final GossipInfoRequest GOSSIP_INFO_REQUEST = new 
GossipInfoRequest();
     protected static final ListOperationalJobsRequest LIST_JOBS_REQUEST = new 
ListOperationalJobsRequest();
     protected static final NodeDecommissionRequest NODE_DECOMMISSION_REQUEST = 
new NodeDecommissionRequest();
+    protected static final NodeDrainRequest NODE_DRAIN_REQUEST = new 
NodeDrainRequest();
 
     protected static final StreamStatsRequest STREAM_STATS_REQUEST = new 
StreamStatsRequest();
     protected static final LifecycleInfoRequest LIFECYCLE_INFO_REQUEST = new 
LifecycleInfoRequest();
@@ -582,6 +584,17 @@ public class RequestContext
             return request(NODE_DECOMMISSION_REQUEST);
         }
 
+        /**
+         * Sets the {@code request} to be a {@link NodeDrainRequest} and 
returns a reference to this Builder
+         * enabling method chaining.
+         *
+         * @return a reference to this Builder
+         */
+        public Builder nodeDrainRequest()
+        {
+            return request(NODE_DRAIN_REQUEST);
+        }
+
         /**
          * Sets the {@code request} to be a {@link GossipUpdateRequest} for the
          * given {@link NodeCommandRequestPayload.State state}, and returns a 
reference to this Builder enabling method chaining.
diff --git 
a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java 
b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
index 373bd336..9ce315fb 100644
--- 
a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
+++ 
b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
@@ -823,6 +823,20 @@ public class SidecarClient implements AutoCloseable, 
SidecarClientBlobRestoreExt
                                             .build());
     }
 
+    /**
+     * Executes the node drain request using the default retry policy and 
configured selection policy
+     *
+     * @param instance the instance where the request will be executed
+     * @return a completable future of the jobs list
+     */
+    public CompletableFuture<OperationalJobResponse> nodeDrain(SidecarInstance 
instance)
+    {
+        return executor.executeRequestAsync(requestBuilder()
+                                            
.singleInstanceSelectionPolicy(instance)
+                                            .nodeDrainRequest()
+                                            .build());
+    }
+
     /**
      * Sends a request to start or stop Cassandra gossiping on the provided 
instance.
      * <p>
diff --git 
a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
 
b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
index b13a209b..5071d0e7 100644
--- 
a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
+++ 
b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
@@ -1393,6 +1393,25 @@ abstract class SidecarClientTest
         validateResponseServed(ApiEndpointsV1.NODE_DECOMMISSION_ROUTE);
     }
 
+    @Test
+    public void testNodeDrain() throws Exception
+    {
+        UUID jobId = UUID.randomUUID();
+        String nodeDrainString = "{\"jobId\":\"" + jobId + 
"\",\"jobStatus\":\"SUCCEEDED\",\"instance\":\"127.0.0.1\"}";
+
+        MockResponse response = new MockResponse()
+                                .setResponseCode(OK.code())
+                                .setHeader("content-type", "application/json")
+                                .setBody(nodeDrainString);
+        enqueue(response);
+
+        SidecarInstanceImpl sidecarInstance = 
RequestExecutorTest.newSidecarInstance(servers.get(0));
+        OperationalJobResponse result = 
client.nodeDrain(sidecarInstance).get(30, TimeUnit.SECONDS);
+        assertThat(result).isNotNull();
+        assertThat(result.status()).isEqualTo(OperationalJobStatus.SUCCEEDED);
+        validateResponseServed(ApiEndpointsV1.NODE_DRAIN_ROUTE);
+    }
+
     @Test
     void testFailsWithOneAttemptPerServer()
     {
diff --git 
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraNodeOperationsIntegrationTest.java
 
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraNodeOperationsIntegrationTest.java
new file mode 100644
index 00000000..8557b451
--- /dev/null
+++ 
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraNodeOperationsIntegrationTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.Test;
+
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.json.JsonObject;
+import io.vertx.ext.web.client.HttpResponse;
+import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import 
org.apache.cassandra.sidecar.testing.SharedClusterSidecarIntegrationTestBase;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static org.apache.cassandra.testing.utils.AssertionUtils.getBlocking;
+import static org.apache.cassandra.testing.utils.AssertionUtils.loopAssert;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration tests for Cassandra node drain operations
+ */
+public class CassandraNodeOperationsIntegrationTest extends 
SharedClusterSidecarIntegrationTestBase
+{
+    public static final String CASSANDRA_VERSION_4_0 = "4.0";
+
+    @Override
+    protected void initializeSchemaForTest()
+    {
+        // No schema init needed
+    }
+
+    @Override
+    protected void beforeTestStart()
+    {
+        // wait for the schema initialization
+        waitForSchemaReady(30, TimeUnit.SECONDS);
+    }
+
+    @Test
+    void testNodeDrainOperationSuccess()
+    {
+        // Initiate drain operation
+        HttpResponse<Buffer> drainResponse = getBlocking(
+        trustedClient().put(serverWrapper.serverPort, "localhost", 
ApiEndpointsV1.NODE_DRAIN_ROUTE)
+                       .send());
+
+        assertThat(drainResponse.statusCode()).isEqualTo(OK.code());
+
+        JsonObject responseBody = drainResponse.bodyAsJsonObject();
+        assertThat(responseBody).isNotNull();
+        assertThat(responseBody.getString("jobId")).isNotNull();
+        assertThat(responseBody.getString("jobStatus")).isIn(
+        OperationalJobStatus.CREATED.name(),
+        OperationalJobStatus.RUNNING.name(),
+        OperationalJobStatus.SUCCEEDED.name()
+        );
+
+        loopAssert(30, 500, () -> {
+            // Verify node status is DRAINED by checking the operationMode via 
stream stats endpoint
+            HttpResponse<Buffer> streamStatsResponse = getBlocking(
+            trustedClient().get(serverWrapper.serverPort, "localhost", 
ApiEndpointsV1.STREAM_STATS_ROUTE)
+                           .send());
+
+            assertThat(streamStatsResponse.statusCode()).isEqualTo(OK.code());
+
+            JsonObject streamStats = streamStatsResponse.bodyAsJsonObject();
+            assertThat(streamStats).isNotNull();
+            
assertThat(streamStats.getString("operationMode")).isEqualTo("DRAINED");
+        });
+
+        // Validate the operational job status using the OperationalJobHandler
+        String jobId = responseBody.getString("jobId");
+        validateOperationalJobStatus(jobId, "drain");
+    }
+
+    /**
+     * Validates the operational job status by querying the 
OperationalJobHandler endpoint
+     * and waiting for the job to reach a final state if necessary.
+     *
+     * @param jobId the ID of the operational job to validate
+     * @param expectedOperation the expected operation name (e.g., "move", 
"decommission", "drain")
+     */
+    private void validateOperationalJobStatus(String jobId, String 
expectedOperation)
+    {
+        String operationalJobRoute = 
ApiEndpointsV1.OPERATIONAL_JOB_ROUTE.replace(":operationId", jobId);
+
+        HttpResponse<Buffer> jobStatusResponse = getBlocking(
+        trustedClient().get(serverWrapper.serverPort, "localhost", 
operationalJobRoute)
+                       .send());
+
+        assertThat(jobStatusResponse.statusCode()).isEqualTo(OK.code());
+
+        JsonObject jobStatusBody = jobStatusResponse.bodyAsJsonObject();
+        assertThat(jobStatusBody).isNotNull();
+        assertThat(jobStatusBody.getString("jobId")).isEqualTo(jobId);
+        
assertThat(jobStatusBody.getString("operation")).isEqualTo(expectedOperation);
+        assertThat(jobStatusBody.getString("jobStatus")).isIn(
+        OperationalJobStatus.RUNNING.name(),
+        OperationalJobStatus.SUCCEEDED.name()
+        );
+
+        // If the job is still running, wait for it to complete or reach a 
final state
+        if 
(OperationalJobStatus.RUNNING.name().equals(jobStatusBody.getString("jobStatus")))
+        {
+            loopAssert(30, 500, () -> {
+                HttpResponse<Buffer> finalJobStatusResponse = getBlocking(
+                trustedClient().get(serverWrapper.serverPort, "localhost", 
operationalJobRoute)
+                               .send());
+
+                
assertThat(finalJobStatusResponse.statusCode()).isEqualTo(OK.code());
+
+                JsonObject finalJobStatusBody = 
finalJobStatusResponse.bodyAsJsonObject();
+                assertThat(finalJobStatusBody).isNotNull();
+                assertThat(finalJobStatusBody.getString("jobStatus")).isIn(
+                OperationalJobStatus.SUCCEEDED.name(),
+                OperationalJobStatus.FAILED.name()
+                );
+            });
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    protected void tearDown() throws Exception
+    {
+        try
+        {
+            super.tearDown();
+        }
+        catch (IllegalStateException ex)
+        {
+            logger.error("Exception in tear down", ex);
+            // When cluster.close() is called after drain For Cassandra 4.0
+            // it throws IllegalStateException "HintsService has already been 
shut down".
+            if (!CASSANDRA_VERSION_4_0.equals(this.testVersion.version()))
+            {
+                throw ex;
+            }
+            logger.warn("Suppressing {} for Cassandra version {}",
+                        ex.getClass().getCanonicalName(), 
CASSANDRA_VERSION_4_0);
+        }
+    }
+}
diff --git 
a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java
 
b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java
index d135e61d..021afa52 100644
--- 
a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java
+++ 
b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java
@@ -118,6 +118,11 @@ public interface StorageOperations
      */
     void decommission(boolean force);
 
+    /**
+     * Triggers the node drain operation
+     */
+    void drain() throws IOException, InterruptedException, ExecutionException;
+
     /**
      * @return returns true if gossip is running, false otherwise
      */
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java
index b57ac623..1247739e 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java
@@ -62,6 +62,7 @@ public class BasicPermissions
     // sidecar operation related permissions
     public static final Permission READ_OPERATIONAL_JOB = new 
DomainAwarePermission("OPERATIONAL_JOB:READ", OPERATION_SCOPE);
     public static final Permission DECOMMISSION_NODE = new 
DomainAwarePermission("NODE:DECOMMISSION", OPERATION_SCOPE);
+    public static final Permission DRAIN_NODE = new 
DomainAwarePermission("NODE:DRAIN", OPERATION_SCOPE);
 
     // Permissions related to Schema Reporting
     public static final Permission REPORT_SCHEMA = new 
DomainAwarePermission("SCHEMA:PUBLISH", CLUSTER_SCOPE);
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeDecommissionHandler.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeDecommissionHandler.java
index 63b5b66e..d2c522eb 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeDecommissionHandler.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeDecommissionHandler.java
@@ -23,18 +23,14 @@ import java.util.Set;
 
 import com.datastax.driver.core.utils.UUIDs;
 import com.google.inject.Inject;
-import io.netty.handler.codec.http.HttpResponseStatus;
 import io.vertx.core.http.HttpServerRequest;
 import io.vertx.core.net.SocketAddress;
 import io.vertx.ext.auth.authorization.Authorization;
 import io.vertx.ext.web.RoutingContext;
 import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions;
-import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
-import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
 import org.apache.cassandra.sidecar.common.server.StorageOperations;
 import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
 import org.apache.cassandra.sidecar.config.ServiceConfiguration;
-import org.apache.cassandra.sidecar.exceptions.OperationalJobConflictException;
 import org.apache.cassandra.sidecar.job.NodeDecommissionJob;
 import org.apache.cassandra.sidecar.job.OperationalJobManager;
 import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
@@ -89,22 +85,11 @@ public class NodeDecommissionHandler extends 
AbstractHandler<Boolean> implements
     {
         StorageOperations operations = 
metadataFetcher.delegate(host).storageOperations();
         NodeDecommissionJob job = new NodeDecommissionJob(UUIDs.timeBased(), 
operations, isForce);
-        try
-        {
-            jobManager.trySubmitJob(job);
-        }
-        catch (OperationalJobConflictException oje)
-        {
-            String reason = oje.getMessage();
-            logger.error("Conflicting job encountered. reason={}", reason);
-            
context.response().setStatusCode(HttpResponseStatus.CONFLICT.code());
-            context.json(new OperationalJobResponse(job.jobId(), 
OperationalJobStatus.FAILED, job.name(), reason));
-            return;
-        }
-
-        // Get the result, waiting for the specified wait time for result
-        job.asyncResult(executorPools.service(), 
config.operationalJobExecutionMaxWaitTime())
-           .onComplete(v -> 
OperationalJobUtils.sendStatusBasedResponse(context, job));
+        this.jobManager.trySubmitJob(job,
+                                     (completedJob, exception) ->
+                                     
OperationalJobUtils.sendStatusBasedResponse(context, completedJob, exception),
+                                     executorPools.service(),
+                                     
config.operationalJobExecutionMaxWaitTime());
     }
 
     /**
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeDecommissionHandler.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeDrainHandler.java
similarity index 57%
copy from 
server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeDecommissionHandler.java
copy to 
server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeDrainHandler.java
index 63b5b66e..b0de6753 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeDecommissionHandler.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeDrainHandler.java
@@ -23,31 +23,25 @@ import java.util.Set;
 
 import com.datastax.driver.core.utils.UUIDs;
 import com.google.inject.Inject;
-import io.netty.handler.codec.http.HttpResponseStatus;
 import io.vertx.core.http.HttpServerRequest;
 import io.vertx.core.net.SocketAddress;
 import io.vertx.ext.auth.authorization.Authorization;
 import io.vertx.ext.web.RoutingContext;
 import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions;
-import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
-import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
 import org.apache.cassandra.sidecar.common.server.StorageOperations;
 import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
 import org.apache.cassandra.sidecar.config.ServiceConfiguration;
-import org.apache.cassandra.sidecar.exceptions.OperationalJobConflictException;
-import org.apache.cassandra.sidecar.job.NodeDecommissionJob;
+import org.apache.cassandra.sidecar.job.NodeDrainJob;
 import org.apache.cassandra.sidecar.job.OperationalJobManager;
 import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
 import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
 import org.apache.cassandra.sidecar.utils.OperationalJobUtils;
 import org.jetbrains.annotations.NotNull;
 
-import static 
org.apache.cassandra.sidecar.utils.RequestUtils.parseBooleanQueryParam;
-
 /**
- * Provides REST API for asynchronously decommissioning the corresponding 
Cassandra node
+ * Provides REST API for asynchronously draining the corresponding Cassandra 
node
  */
-public class NodeDecommissionHandler extends AbstractHandler<Boolean> 
implements AccessProtected
+public class NodeDrainHandler extends AbstractHandler<Void> implements 
AccessProtected
 {
     private final OperationalJobManager jobManager;
     private final ServiceConfiguration config;
@@ -60,11 +54,11 @@ public class NodeDecommissionHandler extends 
AbstractHandler<Boolean> implements
      * @param validator       a validator instance to validate 
Cassandra-specific input
      */
     @Inject
-    protected NodeDecommissionHandler(InstanceMetadataFetcher metadataFetcher,
-                                      ExecutorPools executorPools,
-                                      ServiceConfiguration 
serviceConfiguration,
-                                      CassandraInputValidator validator,
-                                      OperationalJobManager jobManager)
+    protected NodeDrainHandler(InstanceMetadataFetcher metadataFetcher,
+                               ExecutorPools executorPools,
+                               ServiceConfiguration serviceConfiguration,
+                               CassandraInputValidator validator,
+                               OperationalJobManager jobManager)
     {
         super(metadataFetcher, executorPools, validator);
         this.jobManager = jobManager;
@@ -74,7 +68,7 @@ public class NodeDecommissionHandler extends 
AbstractHandler<Boolean> implements
     @Override
     public Set<Authorization> requiredAuthorizations()
     {
-        return 
Collections.singleton(BasicPermissions.DECOMMISSION_NODE.toAuthorization());
+        return 
Collections.singleton(BasicPermissions.DRAIN_NODE.toAuthorization());
     }
 
     /**
@@ -85,34 +79,24 @@ public class NodeDecommissionHandler extends 
AbstractHandler<Boolean> implements
                                HttpServerRequest httpRequest,
                                @NotNull String host,
                                SocketAddress remoteAddress,
-                               Boolean isForce)
+                               Void unused)
     {
         StorageOperations operations = 
metadataFetcher.delegate(host).storageOperations();
-        NodeDecommissionJob job = new NodeDecommissionJob(UUIDs.timeBased(), 
operations, isForce);
-        try
-        {
-            jobManager.trySubmitJob(job);
-        }
-        catch (OperationalJobConflictException oje)
-        {
-            String reason = oje.getMessage();
-            logger.error("Conflicting job encountered. reason={}", reason);
-            
context.response().setStatusCode(HttpResponseStatus.CONFLICT.code());
-            context.json(new OperationalJobResponse(job.jobId(), 
OperationalJobStatus.FAILED, job.name(), reason));
-            return;
-        }
-
-        // Get the result, waiting for the specified wait time for result
-        job.asyncResult(executorPools.service(), 
config.operationalJobExecutionMaxWaitTime())
-           .onComplete(v -> 
OperationalJobUtils.sendStatusBasedResponse(context, job));
+        NodeDrainJob job = new NodeDrainJob(UUIDs.timeBased(), operations);
+        this.jobManager.trySubmitJob(job,
+                                     (completedJob, exception) ->
+                                     
OperationalJobUtils.sendStatusBasedResponse(context, completedJob, exception),
+                                     executorPools.service(),
+                                     
config.operationalJobExecutionMaxWaitTime());
     }
 
     /**
      * {@inheritDoc}
      */
     @Override
-    protected Boolean extractParamsOrThrow(RoutingContext context)
+    protected Void extractParamsOrThrow(RoutingContext context)
     {
-        return parseBooleanQueryParam(context.request(), "force", false);
+        // No parameters needed for drain operation
+        return null;
     }
 }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/OperationalJobHandler.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/OperationalJobHandler.java
index ac483974..44925c72 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/OperationalJobHandler.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/OperationalJobHandler.java
@@ -85,7 +85,7 @@ public class OperationalJobHandler extends 
AbstractHandler<UUID> implements Acce
                          return job;
                      })
                      .onFailure(cause -> processFailure(cause, context, host, 
remoteAddress, jobId))
-                     .onSuccess(job -> 
OperationalJobUtils.sendStatusBasedResponse(context, job));
+                     .onSuccess(job -> 
OperationalJobUtils.sendStatusBasedResponse(context, job, null));
     }
 
     /**
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/job/NodeDrainJob.java 
b/server/src/main/java/org/apache/cassandra/sidecar/job/NodeDrainJob.java
new file mode 100644
index 00000000..d6485764
--- /dev/null
+++ b/server/src/main/java/org/apache/cassandra/sidecar/job/NodeDrainJob.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.job;
+
+import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import 
org.apache.cassandra.sidecar.common.server.exceptions.OperationalJobException;
+
+/**
+ * Implementation of {@link OperationalJob} to perform node drain operation.
+ */
+public class NodeDrainJob extends OperationalJob
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(NodeDrainJob.class);
+    private static final String OPERATION = "drain";
+    protected StorageOperations storageOperations;
+
+    /**
+     * Enum representing the various drain states of a Cassandra node.
+     */
+    public enum NodeDrainStateEnum
+    {
+        DRAINING(OperationalJobStatus.RUNNING),
+        DRAINED(OperationalJobStatus.SUCCEEDED);
+
+        final OperationalJobStatus jobStatus;
+
+        NodeDrainStateEnum(OperationalJobStatus jobStatus)
+        {
+            this.jobStatus = jobStatus;
+        }
+
+        /**
+         * Converts a string operation mode to a DrainState enum.
+         *
+         * @param operationMode the operation mode string
+         * @return the corresponding DrainState, or null if not a drain state
+         */
+        public static NodeDrainStateEnum fromOperationMode(String 
operationMode)
+        {
+            try
+            {
+                return valueOf(operationMode);
+            }
+            catch (IllegalArgumentException | NullPointerException e)
+            {
+                LOGGER.error("No matching NodeDrainStateEnum found for 
Cassandra node status={}. Error={}",
+                             operationMode, e);
+                return null;
+            }
+        }
+    }
+
+    public NodeDrainJob(UUID jobId, StorageOperations storageOps)
+    {
+        super(jobId);
+        this.storageOperations = storageOps;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean isRunningOnCassandra()
+    {
+        String operationMode = storageOperations.operationMode();
+        NodeDrainStateEnum nodeDrainStateEnum = 
NodeDrainStateEnum.fromOperationMode(operationMode);
+        return nodeDrainStateEnum == NodeDrainStateEnum.DRAINING;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public OperationalJobStatus status()
+    {
+        String operationMode = storageOperations.operationMode();
+        NodeDrainStateEnum nodeDrainStateEnum = 
NodeDrainStateEnum.fromOperationMode(operationMode);
+
+        if (nodeDrainStateEnum != null)
+        {
+            return nodeDrainStateEnum.jobStatus;
+        }
+        return super.status();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    protected void executeInternal()
+    {
+        if (isRunningOnCassandra())
+        {
+            LOGGER.info("Not executing job as an ongoing drain operation was 
found jobId={}", this.jobId());
+            return;
+        }
+
+        LOGGER.info("Executing drain operation. jobId={}", this.jobId());
+        try
+        {
+            storageOperations.drain();
+        }
+        catch (IOException | ExecutionException | InterruptedException ex)
+        {
+            LOGGER.error("Job execution failed. jobId={} reason='{}'", 
this.jobId(), ex.getMessage(), ex);
+            throw OperationalJobException.wraps(ex);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public String name()
+    {
+        return OPERATION;
+    }
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobManager.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobManager.java
index 3c37ddaa..6d36f353 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobManager.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobManager.java
@@ -20,10 +20,16 @@ package org.apache.cassandra.sidecar.job;
 
 import java.util.List;
 import java.util.UUID;
+import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.google.inject.Inject;
 
 import com.google.inject.Singleton;
+import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
 import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
 import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
 import org.apache.cassandra.sidecar.exceptions.OperationalJobConflictException;
@@ -34,6 +40,7 @@ import 
org.apache.cassandra.sidecar.exceptions.OperationalJobConflictException;
 @Singleton
 public class OperationalJobManager
 {
+    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
     private final OperationalJobTracker jobTracker;
 
     private final TaskExecutorPool internalExecutorPool;
@@ -78,19 +85,37 @@ public class OperationalJobManager
      * Try to submit the job to execute asynchronously, if it is not currently 
being
      * tracked and not running. The job is triggered on a separate internal 
thread-pool.
      * The job execution failure behavior is tracked within the {@link 
OperationalJob}.
+     * This method provides a callback mechanism and configurable wait time 
for job completion.
      *
-     * @param job OperationalJob instance to submit
-     * @throws OperationalJobConflictException when the same operational job 
is already running on Cassandra
+     * @param job                 OperationalJob instance to submit
+     * @param onComplete          callback to invoke when the job completes 
(successfully or with exception)
+     * @param serviceExecutorPool the executor pool to use for waiting on job 
completion
+     * @param waitTime            the maximum time to wait for job completion 
before returning
      */
-    public void trySubmitJob(OperationalJob job) throws 
OperationalJobConflictException
+    public void trySubmitJob(OperationalJob job,
+                             BiConsumer<OperationalJob, 
OperationalJobConflictException> onComplete,
+                             TaskExecutorPool serviceExecutorPool,
+                             DurationSpec waitTime)
     {
-        checkConflict(job);
+        try
+        {
+            checkConflict(job);
+
+            // New job is submitted for all cases when we do not have a 
corresponding downstream job
+            jobTracker.computeIfAbsent(job.jobId(), jobId -> {
+                internalExecutorPool.executeBlocking(job::execute);
+                return job;
+            });
+        }
+        catch (OperationalJobConflictException oje)
+        {
+            onComplete.accept(job, oje);
+            return;
+        }
 
-        // New job is submitted for all cases when we do not have a 
corresponding downstream job
-        jobTracker.computeIfAbsent(job.jobId(), jobId -> {
-            internalExecutorPool.executeBlocking(job::execute);
-            return job;
-        });
+        // Get the result, waiting for the specified wait time for result
+        job.asyncResult(serviceExecutorPool, waitTime)
+           .onComplete(v -> onComplete.accept(job, null));
     }
 
     /**
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java
index 9b1278a8..3101f421 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java
@@ -45,6 +45,7 @@ import 
org.apache.cassandra.sidecar.handlers.KeyspaceSchemaHandler;
 import org.apache.cassandra.sidecar.handlers.ListOperationalJobsHandler;
 import org.apache.cassandra.sidecar.handlers.NativeUpdateHandler;
 import org.apache.cassandra.sidecar.handlers.NodeDecommissionHandler;
+import org.apache.cassandra.sidecar.handlers.NodeDrainHandler;
 import org.apache.cassandra.sidecar.handlers.OperationalJobHandler;
 import org.apache.cassandra.sidecar.handlers.RingHandler;
 import org.apache.cassandra.sidecar.handlers.SchemaHandler;
@@ -156,6 +157,26 @@ public class CassandraOperationsModule extends 
AbstractModule
         return factory.buildRouteWithHandler(nodeDecommissionHandler);
     }
 
+    @PUT
+    @Path(ApiEndpointsV1.NODE_DRAIN_ROUTE)
+    @Operation(summary = "Drain node",
+               description = "Drains the Cassandra node by flushing memtables 
and stopping writes")
+    @APIResponse(description = "Node drain operation completed successfully",
+                 responseCode = "200",
+                 content = @Content(mediaType = "application/json",
+                 schema = @Schema(implementation = 
OperationalJobResponse.class)))
+    @APIResponse(description = "Node drain operation initiated successfully",
+                 responseCode = "202",
+                 content = @Content(mediaType = "application/json",
+                 schema = @Schema(implementation = 
OperationalJobResponse.class)))
+    @ProvidesIntoMap
+    @KeyClassMapKey(VertxRouteMapKeys.CassandraNodeDrainRouteKey.class)
+    VertxRoute cassandraNodeDrainRoute(RouteBuilder.Factory factory,
+                                       NodeDrainHandler nodeDrainHandler)
+    {
+        return factory.buildRouteWithHandler(nodeDrainHandler);
+    }
+
     @GET
     @Path(ApiEndpointsV1.STREAM_STATS_ROUTE)
     @Operation(summary = "Get stream statistics",
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java
index e3bec846..b1561e79 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java
@@ -83,6 +83,11 @@ public interface VertxRouteMapKeys
         HttpMethod HTTP_METHOD = HttpMethod.PUT;
         String ROUTE_URI = ApiEndpointsV1.NODE_DECOMMISSION_ROUTE;
     }
+    interface CassandraNodeDrainRouteKey extends RouteClassKey
+    {
+        HttpMethod HTTP_METHOD = HttpMethod.PUT;
+        String ROUTE_URI = ApiEndpointsV1.NODE_DRAIN_ROUTE;
+    }
     interface CassandraNodeSettingsRouteKey extends RouteClassKey
     {
         HttpMethod HTTP_METHOD = HttpMethod.GET;
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/utils/OperationalJobUtils.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/utils/OperationalJobUtils.java
index e36f1a89..e640b94e 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/utils/OperationalJobUtils.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/utils/OperationalJobUtils.java
@@ -25,6 +25,7 @@ import io.netty.handler.codec.http.HttpResponseStatus;
 import io.vertx.ext.web.RoutingContext;
 import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
 import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
+import org.apache.cassandra.sidecar.exceptions.OperationalJobConflictException;
 import org.apache.cassandra.sidecar.job.OperationalJob;
 
 import static 
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.FAILED;
@@ -37,13 +38,26 @@ public class OperationalJobUtils
     private static final Logger LOGGER = 
LoggerFactory.getLogger(OperationalJobUtils.class);
 
     /**
-     * In the operational job context, sends a {@link OperationalJobResponse} 
based on the status of the job.
+     * Sends an HTTP response with appropriate status code and {@link 
OperationalJobResponse} payload
+     * based on the operational job status and any conflict exceptions.
+     * If an exception is provided, responds with HTTP 409 CONFLICT and FAILED 
status.
+     * Otherwise, responds with HTTP 200 OK for completed jobs or HTTP 202 
ACCEPTED for ongoing jobs.
      *
-     * @param context the request context
-     * @param job     the operational job to reports status on
+     * @param context   the routing context for the HTTP request
+     * @param job       the operational job to report status on
+     * @param exception the conflict exception, if any (null if no conflict)
      */
-    public static void sendStatusBasedResponse(RoutingContext context, 
OperationalJob job)
+    public static void sendStatusBasedResponse(RoutingContext context, 
OperationalJob job, OperationalJobConflictException exception)
     {
+        if (exception != null)
+        {
+            String reason = exception.getMessage();
+            LOGGER.error("Conflicting job encountered. reason={}", reason);
+            
context.response().setStatusCode(HttpResponseStatus.CONFLICT.code());
+            context.json(new OperationalJobResponse(job.jobId(), 
OperationalJobStatus.FAILED, job.name(), reason));
+            return;
+        }
+
         OperationalJobStatus status = job.status();
         LOGGER.info("Job completion status={} jobId={}", status, job.jobId());
         if (status.isCompleted())
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/handlers/NodeDrainHandlerTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/NodeDrainHandlerTest.java
new file mode 100644
index 00000000..49e1604a
--- /dev/null
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/NodeDrainHandlerTest.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.handlers;
+
+import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import com.google.inject.util.Modules;
+import io.vertx.core.Vertx;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.predicate.ResponsePredicate;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.sidecar.TestModule;
+import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
+import org.apache.cassandra.sidecar.cluster.InstancesMetadata;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.modules.SidecarModules;
+import org.apache.cassandra.sidecar.server.Server;
+import org.mockito.AdditionalAnswers;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.ACCEPTED;
+import static io.netty.handler.codec.http.HttpResponseStatus.CONFLICT;
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static 
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.RUNNING;
+import static 
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.SUCCEEDED;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link NodeDrainHandler}
+ */
+@ExtendWith(VertxExtension.class)
+public class NodeDrainHandlerTest
+{
+    static final Logger LOGGER = 
LoggerFactory.getLogger(NodeDrainHandlerTest.class);
+    static final String TEST_ROUTE = "/api/v1/cassandra/operations/drain";
+    static final String OPERATION_MODE_NORMAL = "NORMAL";
+    static final String OPERATION_MODE_DRAINING = "DRAINING";
+    static final String OPERATION_MODE_JOINING = "JOINING";
+    static final String EXPECTED_OPERATION_NAME = "drain";
+    static final String SIMULATED_DRAIN_FAILURE = "Simulated drain failure";
+
+    Vertx vertx;
+    Server server;
+    StorageOperations mockStorageOperations = mock(StorageOperations.class);
+
+    @BeforeEach
+    void before() throws InterruptedException
+    {
+        Injector injector;
+        Module testOverride = Modules.override(new TestModule())
+                                     .with(new 
NodeDrainHandlerTest.NodeDrainTestModule());
+        injector = Guice.createInjector(Modules.override(SidecarModules.all())
+                                               .with(testOverride));
+        vertx = injector.getInstance(Vertx.class);
+        server = injector.getInstance(Server.class);
+        VertxTestContext context = new VertxTestContext();
+        server.start()
+              .onSuccess(s -> context.completeNow())
+              .onFailure(context::failNow);
+        context.awaitCompletion(5, TimeUnit.SECONDS);
+    }
+
+    @AfterEach
+    void after() throws InterruptedException
+    {
+        CountDownLatch closeLatch = new CountDownLatch(1);
+        server.close().onSuccess(res -> closeLatch.countDown());
+        if (closeLatch.await(60, TimeUnit.SECONDS))
+            LOGGER.info("Close event received before timeout.");
+        else
+            LOGGER.error("Close event timed out.");
+    }
+
+    @Test
+    void testDrainLongRunning(VertxTestContext context) throws Exception
+    {
+        
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL);
+        doAnswer(AdditionalAnswers.answersWithDelay(6000, invocation -> null))
+        .when(mockStorageOperations).drain();
+
+        WebClient client = WebClient.create(vertx);
+        client.put(server.actualPort(), "127.0.0.1", TEST_ROUTE)
+              .expect(ResponsePredicate.SC_ACCEPTED)
+              .send(context.succeeding(response -> {
+                  assertThat(response.statusCode()).isEqualTo(ACCEPTED.code());
+                  OperationalJobResponse drainResponse = 
response.bodyAsJson(OperationalJobResponse.class);
+                  assertThat(drainResponse).isNotNull();
+                  assertThat(drainResponse.status()).isEqualTo(RUNNING);
+                  assertThat(drainResponse.jobId()).isNotNull();
+                  
assertThat(drainResponse.operation()).isEqualTo(EXPECTED_OPERATION_NAME);
+                  context.completeNow();
+              }));
+    }
+
+    @Test
+    void testDrainCompleted(VertxTestContext context)
+    {
+        
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL);
+
+        WebClient client = WebClient.create(vertx);
+        client.put(server.actualPort(), "127.0.0.1", TEST_ROUTE)
+              .send(context.succeeding(response -> {
+                  assertThat(response.statusCode()).isEqualTo(OK.code());
+                  LOGGER.info("Drain Response: {}", response.bodyAsString());
+
+                  OperationalJobResponse drainResponse = 
response.bodyAsJson(OperationalJobResponse.class);
+                  assertThat(drainResponse).isNotNull();
+                  assertThat(drainResponse.status()).isEqualTo(SUCCEEDED);
+                  assertThat(drainResponse.jobId()).isNotNull();
+                  
assertThat(drainResponse.operation()).isEqualTo(EXPECTED_OPERATION_NAME);
+                  context.completeNow();
+              }));
+    }
+
+    @Test
+    void testDrainFailed(VertxTestContext context) throws Exception
+    {
+        
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL);
+        doThrow(new 
RuntimeException(SIMULATED_DRAIN_FAILURE)).when(mockStorageOperations).drain();
+
+        WebClient client = WebClient.create(vertx);
+        client.put(server.actualPort(), "127.0.0.1", TEST_ROUTE)
+              .expect(ResponsePredicate.SC_OK)
+              .send(context.succeeding(response -> {
+                  assertThat(response.statusCode()).isEqualTo(OK.code());
+                  LOGGER.info("Drain Response: {}", response.bodyAsString());
+
+                  OperationalJobResponse drainResponse = 
response.bodyAsJson(OperationalJobResponse.class);
+                  assertThat(drainResponse).isNotNull();
+                  assertThat(drainResponse.jobId()).isNotNull();
+                  
assertThat(drainResponse.operation()).isEqualTo(EXPECTED_OPERATION_NAME);
+                  context.completeNow();
+              }));
+    }
+
+    @Test
+    void testDrainConflictWhenDraining(VertxTestContext context)
+    {
+        
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_DRAINING);
+
+        WebClient client = WebClient.create(vertx);
+        client.put(server.actualPort(), "127.0.0.1", TEST_ROUTE)
+              .expect(ResponsePredicate.SC_CONFLICT)
+              .send(context.succeeding(response -> {
+                  assertThat(response.statusCode()).isEqualTo(CONFLICT.code());
+                  LOGGER.info("Drain Response: {}", response.bodyAsString());
+
+                  OperationalJobResponse drainResponse = 
response.bodyAsJson(OperationalJobResponse.class);
+                  assertThat(drainResponse).isNotNull();
+                  assertThat(drainResponse.jobId()).isNotNull();
+                  
assertThat(drainResponse.operation()).isEqualTo(EXPECTED_OPERATION_NAME);
+                  context.completeNow();
+              }));
+    }
+
+    @Test
+    void testDrainAllowedWhenJoining(VertxTestContext context)
+    {
+        
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_JOINING);
+
+        WebClient client = WebClient.create(vertx);
+        client.put(server.actualPort(), "127.0.0.1", TEST_ROUTE)
+              .send(context.succeeding(response -> {
+                  assertThat(response.statusCode()).isEqualTo(OK.code());
+                  LOGGER.info("Drain Response: {}", response.bodyAsString());
+
+                  OperationalJobResponse drainResponse = 
response.bodyAsJson(OperationalJobResponse.class);
+                  assertThat(drainResponse).isNotNull();
+                  assertThat(drainResponse.status()).isEqualTo(SUCCEEDED);
+                  assertThat(drainResponse.jobId()).isNotNull();
+                  
assertThat(drainResponse.operation()).isEqualTo(EXPECTED_OPERATION_NAME);
+                  context.completeNow();
+              }));
+    }
+
+    /**
+     * Test guice module for Node Drain handler tests
+     */
+    class NodeDrainTestModule extends AbstractModule
+    {
+        @Provides
+        @Singleton
+        public InstancesMetadata instanceMetadata()
+        {
+            int instanceId = 100;
+            String host = "127.0.0.1";
+            InstanceMetadata instanceMetadata = mock(InstanceMetadata.class);
+            when(instanceMetadata.host()).thenReturn(host);
+            when(instanceMetadata.port()).thenReturn(9042);
+            when(instanceMetadata.id()).thenReturn(instanceId);
+            when(instanceMetadata.stagingDir()).thenReturn("");
+
+            CassandraAdapterDelegate delegate = 
mock(CassandraAdapterDelegate.class);
+
+            
when(delegate.storageOperations()).thenReturn(mockStorageOperations);
+            when(instanceMetadata.delegate()).thenReturn(delegate);
+
+            InstancesMetadata mockInstancesMetadata = 
mock(InstancesMetadata.class);
+            
when(mockInstancesMetadata.instances()).thenReturn(Collections.singletonList(instanceMetadata));
+            
when(mockInstancesMetadata.instanceFromId(instanceId)).thenReturn(instanceMetadata);
+            
when(mockInstancesMetadata.instanceFromHost(host)).thenReturn(instanceMetadata);
+
+            return mockInstancesMetadata;
+        }
+    }
+}
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/job/NodeDrainJobTest.java 
b/server/src/test/java/org/apache/cassandra/sidecar/job/NodeDrainJobTest.java
new file mode 100644
index 00000000..72df6fb1
--- /dev/null
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/job/NodeDrainJobTest.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.job;
+
+import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import com.datastax.driver.core.utils.UUIDs;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import 
org.apache.cassandra.sidecar.common.server.exceptions.OperationalJobException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for {@link NodeDrainJob}
+ */
+class NodeDrainJobTest
+{
+    private static final String OPERATION_MODE_DRAINING = "DRAINING";
+    private static final String OPERATION_MODE_DRAINED = "DRAINED";
+    private static final String OPERATION_MODE_NORMAL = "NORMAL";
+    private static final String OPERATION_MODE_UNKNOWN = "UNKNOWN";
+    private static final String JOB_NAME_DRAIN = "drain";
+
+    private StorageOperations mockStorageOperations;
+    private NodeDrainJob nodeDrainJob;
+    private UUID jobId;
+
+    @BeforeEach
+    void setup()
+    {
+        mockStorageOperations = mock(StorageOperations.class);
+        jobId = UUIDs.timeBased();
+        nodeDrainJob = new NodeDrainJob(jobId, mockStorageOperations);
+    }
+
+    @Test
+    void testJobIdAndName()
+    {
+        assertThat(nodeDrainJob.jobId()).isEqualTo(jobId);
+        assertThat(nodeDrainJob.name()).isEqualTo(JOB_NAME_DRAIN);
+    }
+
+    @Test
+    void testIsRunningOnCassandra_WhenDraining()
+    {
+        
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_DRAINING);
+
+        assertThat(nodeDrainJob.isRunningOnCassandra()).isTrue();
+    }
+
+    @Test
+    void testIsRunningOnCassandra_WhenDrained()
+    {
+        
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_DRAINED);
+
+        assertThat(nodeDrainJob.isRunningOnCassandra()).isFalse();
+    }
+
+    @Test
+    void testIsRunningOnCassandra_WhenNormal()
+    {
+        
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL);
+
+        assertThat(nodeDrainJob.isRunningOnCassandra()).isFalse();
+    }
+
+    @Test
+    void testIsRunningOnCassandra_WhenUnknownState()
+    {
+        
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_UNKNOWN);
+
+        assertThat(nodeDrainJob.isRunningOnCassandra()).isFalse();
+    }
+
+    @Test
+    void testIsRunningOnCassandra_WhenNull()
+    {
+        when(mockStorageOperations.operationMode()).thenReturn(null);
+
+        assertThat(nodeDrainJob.isRunningOnCassandra()).isFalse();
+    }
+
+    @Test
+    void testStatus_WhenDraining()
+    {
+        
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_DRAINING);
+
+        
assertThat(nodeDrainJob.status()).isEqualTo(OperationalJobStatus.RUNNING);
+    }
+
+    @Test
+    void testStatus_WhenDrained()
+    {
+        
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_DRAINED);
+
+        
assertThat(nodeDrainJob.status()).isEqualTo(OperationalJobStatus.SUCCEEDED);
+    }
+
+    @Test
+    void testStatus_WhenNormal()
+    {
+        
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL);
+
+        
assertThat(nodeDrainJob.status()).isEqualTo(OperationalJobStatus.CREATED);
+    }
+
+    @Test
+    void testStatus_WhenUnknownState()
+    {
+        
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_UNKNOWN);
+
+        
assertThat(nodeDrainJob.status()).isEqualTo(OperationalJobStatus.CREATED);
+    }
+
+    @Test
+    void testStatus_WhenNull()
+    {
+        when(mockStorageOperations.operationMode()).thenReturn(null);
+
+        
assertThat(nodeDrainJob.status()).isEqualTo(OperationalJobStatus.CREATED);
+    }
+
+    @Test
+    void testExecuteInternal_WhenNotRunning() throws IOException, 
ExecutionException, InterruptedException
+    {
+        
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL);
+
+        nodeDrainJob.executeInternal();
+
+        verify(mockStorageOperations).drain();
+    }
+
+    @Test
+    void testExecuteInternal_WhenAlreadyDraining() throws IOException, 
ExecutionException, InterruptedException
+    {
+        
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_DRAINING);
+
+        nodeDrainJob.executeInternal();
+
+        verify(mockStorageOperations, never()).drain();
+    }
+
+    @Test
+    void testExecuteInternal_WhenDrainThrowsIOException() throws IOException, 
ExecutionException, InterruptedException
+    {
+        
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL);
+        IOException ioException = new IOException("Drain failed due to IO 
error");
+        doThrow(ioException).when(mockStorageOperations).drain();
+
+        assertThatThrownBy(() -> nodeDrainJob.executeInternal())
+        .isInstanceOf(OperationalJobException.class)
+        .hasCause(ioException);
+
+        verify(mockStorageOperations).drain();
+    }
+
+    @Test
+    void testExecuteInternal_WhenDrainThrowsExecutionException() throws 
IOException, ExecutionException, InterruptedException
+    {
+        
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL);
+        ExecutionException executionException = new ExecutionException("Drain 
failed during execution", new RuntimeException());
+        doThrow(executionException).when(mockStorageOperations).drain();
+
+        assertThatThrownBy(() -> nodeDrainJob.executeInternal())
+        .isInstanceOf(OperationalJobException.class)
+        .hasCause(executionException);
+
+        verify(mockStorageOperations).drain();
+    }
+
+    @Test
+    void testExecuteInternal_WhenDrainThrowsInterruptedException() throws 
IOException, ExecutionException, InterruptedException
+    {
+        
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL);
+        InterruptedException interruptedException = new 
InterruptedException("Drain was interrupted");
+        doThrow(interruptedException).when(mockStorageOperations).drain();
+
+        assertThatThrownBy(() -> nodeDrainJob.executeInternal())
+        .isInstanceOf(OperationalJobException.class)
+        .hasCause(interruptedException);
+
+        verify(mockStorageOperations).drain();
+    }
+
+    @Test
+    void testNodeDrainStateEnum_FromOperationMode()
+    {
+        
assertThat(NodeDrainJob.NodeDrainStateEnum.fromOperationMode(OPERATION_MODE_DRAINING))
+        .isEqualTo(NodeDrainJob.NodeDrainStateEnum.DRAINING);
+
+        
assertThat(NodeDrainJob.NodeDrainStateEnum.fromOperationMode(OPERATION_MODE_DRAINED))
+        .isEqualTo(NodeDrainJob.NodeDrainStateEnum.DRAINED);
+
+        
assertThat(NodeDrainJob.NodeDrainStateEnum.fromOperationMode(OPERATION_MODE_NORMAL))
+        .isNull();
+
+        
assertThat(NodeDrainJob.NodeDrainStateEnum.fromOperationMode(OPERATION_MODE_UNKNOWN))
+        .isNull();
+
+        assertThat(NodeDrainJob.NodeDrainStateEnum.fromOperationMode(null))
+        .isNull();
+
+        assertThat(NodeDrainJob.NodeDrainStateEnum.fromOperationMode(""))
+        .isNull();
+    }
+
+    @Test
+    void testNodeDrainStateEnum_JobStatusMapping()
+    {
+        assertThat(NodeDrainJob.NodeDrainStateEnum.DRAINING.jobStatus)
+        .isEqualTo(OperationalJobStatus.RUNNING);
+
+        assertThat(NodeDrainJob.NodeDrainStateEnum.DRAINED.jobStatus)
+        .isEqualTo(OperationalJobStatus.SUCCEEDED);
+    }
+
+    @Test
+    void testNodeDrainStateEnum_Values()
+    {
+        NodeDrainJob.NodeDrainStateEnum[] expectedValues = {
+        NodeDrainJob.NodeDrainStateEnum.DRAINING,
+        NodeDrainJob.NodeDrainStateEnum.DRAINED
+        };
+
+        
assertThat(NodeDrainJob.NodeDrainStateEnum.values()).containsExactly(expectedValues);
+    }
+
+    @Test
+    void testNodeDrainStateEnum_ValueOf()
+    {
+        
assertThat(NodeDrainJob.NodeDrainStateEnum.valueOf(OPERATION_MODE_DRAINING))
+        .isEqualTo(NodeDrainJob.NodeDrainStateEnum.DRAINING);
+
+        
assertThat(NodeDrainJob.NodeDrainStateEnum.valueOf(OPERATION_MODE_DRAINED))
+        .isEqualTo(NodeDrainJob.NodeDrainStateEnum.DRAINED);
+
+        assertThatThrownBy(() -> 
NodeDrainJob.NodeDrainStateEnum.valueOf(OPERATION_MODE_NORMAL))
+        .isInstanceOf(IllegalArgumentException.class);
+    }
+}
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobManagerTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobManagerTest.java
index f7b6cfe9..d3574564 100644
--- 
a/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobManagerTest.java
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobManagerTest.java
@@ -19,29 +19,26 @@
 package org.apache.cassandra.sidecar.job;
 
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import com.datastax.driver.core.utils.UUIDs;
-import io.vertx.core.Promise;
 import io.vertx.core.Vertx;
 import org.apache.cassandra.sidecar.TestResourceReaper;
 import 
org.apache.cassandra.sidecar.common.server.exceptions.OperationalJobException;
 import 
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
 import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
-import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
 import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl;
 import org.apache.cassandra.sidecar.exceptions.OperationalJobConflictException;
 
 import static 
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.RUNNING;
 import static 
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.SUCCEEDED;
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 /**
  * Tests to validate the Job submission behavior for scenarios which are a 
combination of values for
@@ -71,59 +68,74 @@ class OperationalJobManagerTest
     }
 
     @Test
-    void testWithNoDownstreamJob()
+    void testWithNoDownstreamJob() throws InterruptedException
     {
         OperationalJobTracker tracker = new OperationalJobTracker(4);
         OperationalJobManager manager = new OperationalJobManager(tracker, 
executorPool);
+        CountDownLatch latch = new CountDownLatch(1);
 
         OperationalJob testJob = 
OperationalJobTest.createOperationalJob(SUCCEEDED);
-        manager.trySubmitJob(testJob);
-        testJob.execute(Promise.promise());
+        BiConsumer<OperationalJob, OperationalJobConflictException> onComplete 
= (job, exception) -> {
+            assertThat(exception).isNull();
+            latch.countDown();
+        };
+
+        manager.trySubmitJob(testJob, onComplete, executorPool.service(), 
SecondBoundConfiguration.parse("5s"));
+        assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
         assertThat(testJob.asyncResult().isComplete()).isTrue();
         assertThat(testJob.status()).isEqualTo(SUCCEEDED);
         assertThat(tracker.get(testJob.jobId())).isNotNull();
     }
 
     @Test
-    void testWithRunningDownstreamJob()
+    void testWithRunningDownstreamJob() throws InterruptedException
     {
         OperationalJob runningJob = 
OperationalJobTest.createOperationalJob(RUNNING);
         OperationalJobTracker tracker = new OperationalJobTracker(4);
-        ExecutorPools mockPools = mock(ExecutorPools.class);
-        TaskExecutorPool mockExecPool = mock(TaskExecutorPool.class);
-        when(mockPools.internal()).thenReturn(mockExecPool);
-        when(mockExecPool.runBlocking(any())).thenReturn(null);
         OperationalJobManager manager = new OperationalJobManager(tracker, 
executorPool);
-        assertThatThrownBy(() -> manager.trySubmitJob(runningJob))
-        .isExactlyInstanceOf(OperationalJobConflictException.class)
-        .hasMessage("The same operational job is already running on Cassandra. 
operationName='Operation X'");
+        CountDownLatch latch = new CountDownLatch(1);
+
+        BiConsumer<OperationalJob, OperationalJobConflictException> onComplete 
= (job, exception) -> {
+            
assertThat(exception).isInstanceOf(OperationalJobConflictException.class);
+            assertThat(exception.getMessage()).isEqualTo("The same operational 
job is already running on Cassandra. operationName='Operation X'");
+            latch.countDown();
+        };
+
+        manager.trySubmitJob(runningJob, onComplete, executorPool.service(), 
SecondBoundConfiguration.parse("5s"));
+        assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
     }
 
     @Test
-    void testWithLongRunningJob()
+    void testWithLongRunningJob() throws InterruptedException
     {
         UUID jobId = UUIDs.timeBased();
-
         OperationalJobTracker tracker = new OperationalJobTracker(4);
         OperationalJobManager manager = new OperationalJobManager(tracker, 
executorPool);
+        CountDownLatch latch = new CountDownLatch(1);
 
-        OperationalJob testJob = 
OperationalJobTest.createOperationalJob(jobId, 
SecondBoundConfiguration.parse("10s"));
+        OperationalJob testJob = 
OperationalJobTest.createOperationalJob(jobId, 
SecondBoundConfiguration.parse("2s"));
+        BiConsumer<OperationalJob, OperationalJobConflictException> onComplete 
= (job, exception) -> {
+            assertThat(exception).isNull();
+            latch.countDown();
+        };
 
-        manager.trySubmitJob(testJob);
-        // execute the job async.
-        vertx.executeBlocking(testJob::execute);
-        // by the time of checking, the job should still be running. It runs 
for 10 seconds.
+        manager.trySubmitJob(testJob, onComplete, executorPool.service(), 
SecondBoundConfiguration.parse("5s"));
+        // Job should be running initially
         assertThat(testJob.asyncResult().isComplete()).isFalse();
         assertThat(tracker.get(jobId)).isNotNull();
+        
+        // Wait for completion
+        assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
+        assertThat(testJob.asyncResult().isComplete()).isTrue();
     }
 
     @Test
-    void testWithFailingJob()
+    void testWithFailingJob() throws InterruptedException
     {
         UUID jobId = UUIDs.timeBased();
-
         OperationalJobTracker tracker = new OperationalJobTracker(4);
         OperationalJobManager manager = new OperationalJobManager(tracker, 
executorPool);
+        CountDownLatch latch = new CountDownLatch(1);
 
         String msg = "Test Job failed";
         OperationalJob failingJob = new OperationalJob(jobId)
@@ -141,10 +153,15 @@ class OperationalJobManagerTest
             }
         };
 
-        manager.trySubmitJob(failingJob);
-        failingJob.execute(Promise.promise());
-        assertThat(failingJob.asyncResult().isComplete()).isTrue();
-        assertThat(failingJob.asyncResult().failed()).isTrue();
+        BiConsumer<OperationalJob, OperationalJobConflictException> onComplete 
= (job, exception) -> {
+            assertThat(exception).isNull(); // Exception handled internally by 
job
+            assertThat(job.asyncResult().isComplete()).isTrue();
+            assertThat(job.asyncResult().failed()).isTrue();
+            latch.countDown();
+        };
+
+        manager.trySubmitJob(failingJob, onComplete, executorPool.service(), 
SecondBoundConfiguration.parse("5s"));
+        assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
         assertThat(tracker.get(jobId)).isNotNull();
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to