This is an automated email from the ASF dual-hosted git repository.
saranyak pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3b171b45 CASSSIDECAR-342: Sidecar endpoint for draining a node (#258)
3b171b45 is described below
commit 3b171b45f6f68742e6c2564c8ec4506aa22423c0
Author: Sudipta <[email protected]>
AuthorDate: Tue Oct 28 14:01:27 2025 -0700
CASSSIDECAR-342: Sidecar endpoint for draining a node (#258)
Patch by Sudipta Laha; reviewed by Francisco Guerrero, Arjun Ashok, Saranya
Krishnakumar for CASSSIDECAR-342
---
CHANGES.txt | 1 +
.../adapters/base/CassandraStorageOperations.java | 10 +
.../jmx/GossipDependentStorageJmxOperations.java | 6 +
.../adapters/base/jmx/StorageJmxOperations.java | 5 +
.../cassandra/sidecar/common/ApiEndpointsV1.java | 1 +
.../sidecar/common/request/NodeDrainRequest.java | 46 ++++
.../cassandra/sidecar/client/RequestContext.java | 13 +
.../cassandra/sidecar/client/SidecarClient.java | 14 ++
.../sidecar/client/SidecarClientTest.java | 19 ++
.../CassandraNodeOperationsIntegrationTest.java | 163 +++++++++++++
.../sidecar/common/server/StorageOperations.java | 5 +
.../acl/authorization/BasicPermissions.java | 1 +
.../sidecar/handlers/NodeDecommissionHandler.java | 25 +-
...ommissionHandler.java => NodeDrainHandler.java} | 54 ++---
.../sidecar/handlers/OperationalJobHandler.java | 2 +-
.../apache/cassandra/sidecar/job/NodeDrainJob.java | 142 +++++++++++
.../sidecar/job/OperationalJobManager.java | 43 +++-
.../sidecar/modules/CassandraOperationsModule.java | 21 ++
.../modules/multibindings/VertxRouteMapKeys.java | 5 +
.../sidecar/utils/OperationalJobUtils.java | 22 +-
.../sidecar/handlers/NodeDrainHandlerTest.java | 243 +++++++++++++++++++
.../cassandra/sidecar/job/NodeDrainJobTest.java | 268 +++++++++++++++++++++
.../sidecar/job/OperationalJobManagerTest.java | 77 +++---
23 files changed, 1087 insertions(+), 99 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 4decb80f..21a0bcc1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
0.3.0
-----
+ * Sidecar endpoint for draining a node (CASSSIDECAR-342)
* Avoid resuming stream early during SSTable upload (CASSSIDECAR-359)
* Add cache for Authorization layer (CASSSIDECAR-357)
* Avoid creating objects in the CassandraAdapter implementation
(CASSSIDECAR-355)
diff --git
a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java
b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java
index 7bfdfdd4..db8b14b1 100644
---
a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java
+++
b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java
@@ -236,6 +236,16 @@ public class CassandraStorageOperations implements
StorageOperations
.decommission(force);
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void drain() throws IOException, InterruptedException,
ExecutionException
+ {
+ jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME)
+ .drain();
+ }
+
/**
* {@inheritDoc}
*/
diff --git
a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/GossipDependentStorageJmxOperations.java
b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/GossipDependentStorageJmxOperations.java
index a40255ed..fb753237 100644
---
a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/GossipDependentStorageJmxOperations.java
+++
b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/GossipDependentStorageJmxOperations.java
@@ -172,6 +172,12 @@ public class GossipDependentStorageJmxOperations
implements StorageJmxOperations
delegate.decommission(force);
}
+ @Override
+ public void drain() throws IOException, InterruptedException,
ExecutionException
+ {
+ delegate.drain();
+ }
+
@Override
public String getClusterName()
{
diff --git
a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/StorageJmxOperations.java
b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/StorageJmxOperations.java
index c76ccce6..11e838c4 100644
---
a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/StorageJmxOperations.java
+++
b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/StorageJmxOperations.java
@@ -168,6 +168,11 @@ public interface StorageJmxOperations
*/
void decommission(boolean force) throws IllegalStateException,
IllegalArgumentException, UnsupportedOperationException;
+ /**
+ * Triggers the node drain operation
+ */
+ void drain() throws IOException, InterruptedException, ExecutionException;
+
/**
* Fetch the operation-mode of the node
*
diff --git
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
index d67e4cdc..923342f5 100644
---
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
+++
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
@@ -140,6 +140,7 @@ public final class ApiEndpointsV1
public static final String LIST_OPERATIONAL_JOBS_ROUTE = API_V1 +
CASSANDRA + OPERATIONAL_JOBS;
public static final String OPERATIONAL_JOB_ROUTE = API_V1 + CASSANDRA +
PER_OPERATIONAL_JOB;
public static final String NODE_DECOMMISSION_ROUTE = API_V1 + CASSANDRA +
"/operations/decommission";
+ public static final String NODE_DRAIN_ROUTE = API_V1 + CASSANDRA +
"/operations/drain";
public static final String STREAM_STATS_ROUTE = API_V1 + CASSANDRA +
"/stats/streams";
public static final String TABLE_STATS_ROUTE = API_V1 + CASSANDRA +
PER_KEYSPACE + PER_TABLE + "/stats";
diff --git
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/NodeDrainRequest.java
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/NodeDrainRequest.java
new file mode 100644
index 00000000..4ebd1818
--- /dev/null
+++
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/NodeDrainRequest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.request;
+
+import io.netty.handler.codec.http.HttpMethod;
+import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
+import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
+
+/**
+ * Represents a request to execute node drain operation
+ */
+public class NodeDrainRequest extends JsonRequest<OperationalJobResponse>
+{
+ /**
+ * Constructs a request to execute a node drain operation
+ */
+ public NodeDrainRequest()
+ {
+ super(ApiEndpointsV1.NODE_DRAIN_ROUTE);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public HttpMethod method()
+ {
+ return HttpMethod.PUT;
+ }
+}
diff --git
a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
index 2e630c31..4bf13bd6 100644
---
a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
+++
b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
@@ -45,6 +45,7 @@ import
org.apache.cassandra.sidecar.common.request.ListOperationalJobsRequest;
import org.apache.cassandra.sidecar.common.request.ListSnapshotFilesRequest;
import org.apache.cassandra.sidecar.common.request.NativeUpdateRequest;
import org.apache.cassandra.sidecar.common.request.NodeDecommissionRequest;
+import org.apache.cassandra.sidecar.common.request.NodeDrainRequest;
import org.apache.cassandra.sidecar.common.request.NodeSettingsRequest;
import org.apache.cassandra.sidecar.common.request.OperationalJobRequest;
import org.apache.cassandra.sidecar.common.request.ReportSchemaRequest;
@@ -89,6 +90,7 @@ public class RequestContext
protected static final GossipInfoRequest GOSSIP_INFO_REQUEST = new
GossipInfoRequest();
protected static final ListOperationalJobsRequest LIST_JOBS_REQUEST = new
ListOperationalJobsRequest();
protected static final NodeDecommissionRequest NODE_DECOMMISSION_REQUEST =
new NodeDecommissionRequest();
+ protected static final NodeDrainRequest NODE_DRAIN_REQUEST = new
NodeDrainRequest();
protected static final StreamStatsRequest STREAM_STATS_REQUEST = new
StreamStatsRequest();
protected static final LifecycleInfoRequest LIFECYCLE_INFO_REQUEST = new
LifecycleInfoRequest();
@@ -582,6 +584,17 @@ public class RequestContext
return request(NODE_DECOMMISSION_REQUEST);
}
+ /**
+ * Sets the {@code request} to be a {@link NodeDrainRequest} and
returns a reference to this Builder
+ * enabling method chaining.
+ *
+ * @return a reference to this Builder
+ */
+ public Builder nodeDrainRequest()
+ {
+ return request(NODE_DRAIN_REQUEST);
+ }
+
/**
* Sets the {@code request} to be a {@link GossipUpdateRequest} for the
* given {@link NodeCommandRequestPayload.State state}, and returns a
reference to this Builder enabling method chaining.
diff --git
a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
index 373bd336..9ce315fb 100644
---
a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
+++
b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
@@ -823,6 +823,20 @@ public class SidecarClient implements AutoCloseable,
SidecarClientBlobRestoreExt
.build());
}
+ /**
+ * Executes the node drain request using the default retry policy and
configured selection policy
+ *
+ * @param instance the instance where the request will be executed
+ * @return a completable future of the jobs list
+ */
+ public CompletableFuture<OperationalJobResponse> nodeDrain(SidecarInstance
instance)
+ {
+ return executor.executeRequestAsync(requestBuilder()
+
.singleInstanceSelectionPolicy(instance)
+ .nodeDrainRequest()
+ .build());
+ }
+
/**
* Sends a request to start or stop Cassandra gossiping on the provided
instance.
* <p>
diff --git
a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
index b13a209b..5071d0e7 100644
---
a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
+++
b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
@@ -1393,6 +1393,25 @@ abstract class SidecarClientTest
validateResponseServed(ApiEndpointsV1.NODE_DECOMMISSION_ROUTE);
}
+ @Test
+ public void testNodeDrain() throws Exception
+ {
+ UUID jobId = UUID.randomUUID();
+ String nodeDrainString = "{\"jobId\":\"" + jobId +
"\",\"jobStatus\":\"SUCCEEDED\",\"instance\":\"127.0.0.1\"}";
+
+ MockResponse response = new MockResponse()
+ .setResponseCode(OK.code())
+ .setHeader("content-type", "application/json")
+ .setBody(nodeDrainString);
+ enqueue(response);
+
+ SidecarInstanceImpl sidecarInstance =
RequestExecutorTest.newSidecarInstance(servers.get(0));
+ OperationalJobResponse result =
client.nodeDrain(sidecarInstance).get(30, TimeUnit.SECONDS);
+ assertThat(result).isNotNull();
+ assertThat(result.status()).isEqualTo(OperationalJobStatus.SUCCEEDED);
+ validateResponseServed(ApiEndpointsV1.NODE_DRAIN_ROUTE);
+ }
+
@Test
void testFailsWithOneAttemptPerServer()
{
diff --git
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraNodeOperationsIntegrationTest.java
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraNodeOperationsIntegrationTest.java
new file mode 100644
index 00000000..8557b451
--- /dev/null
+++
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraNodeOperationsIntegrationTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.Test;
+
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.json.JsonObject;
+import io.vertx.ext.web.client.HttpResponse;
+import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import
org.apache.cassandra.sidecar.testing.SharedClusterSidecarIntegrationTestBase;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static org.apache.cassandra.testing.utils.AssertionUtils.getBlocking;
+import static org.apache.cassandra.testing.utils.AssertionUtils.loopAssert;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration tests for Cassandra node drain operations
+ */
+public class CassandraNodeOperationsIntegrationTest extends
SharedClusterSidecarIntegrationTestBase
+{
+ public static final String CASSANDRA_VERSION_4_0 = "4.0";
+
+ @Override
+ protected void initializeSchemaForTest()
+ {
+ // No schema init needed
+ }
+
+ @Override
+ protected void beforeTestStart()
+ {
+ // wait for the schema initialization
+ waitForSchemaReady(30, TimeUnit.SECONDS);
+ }
+
+ @Test
+ void testNodeDrainOperationSuccess()
+ {
+ // Initiate drain operation
+ HttpResponse<Buffer> drainResponse = getBlocking(
+ trustedClient().put(serverWrapper.serverPort, "localhost",
ApiEndpointsV1.NODE_DRAIN_ROUTE)
+ .send());
+
+ assertThat(drainResponse.statusCode()).isEqualTo(OK.code());
+
+ JsonObject responseBody = drainResponse.bodyAsJsonObject();
+ assertThat(responseBody).isNotNull();
+ assertThat(responseBody.getString("jobId")).isNotNull();
+ assertThat(responseBody.getString("jobStatus")).isIn(
+ OperationalJobStatus.CREATED.name(),
+ OperationalJobStatus.RUNNING.name(),
+ OperationalJobStatus.SUCCEEDED.name()
+ );
+
+ loopAssert(30, 500, () -> {
+ // Verify node status is DRAINED by checking the operationMode via
stream stats endpoint
+ HttpResponse<Buffer> streamStatsResponse = getBlocking(
+ trustedClient().get(serverWrapper.serverPort, "localhost",
ApiEndpointsV1.STREAM_STATS_ROUTE)
+ .send());
+
+ assertThat(streamStatsResponse.statusCode()).isEqualTo(OK.code());
+
+ JsonObject streamStats = streamStatsResponse.bodyAsJsonObject();
+ assertThat(streamStats).isNotNull();
+
assertThat(streamStats.getString("operationMode")).isEqualTo("DRAINED");
+ });
+
+ // Validate the operational job status using the OperationalJobHandler
+ String jobId = responseBody.getString("jobId");
+ validateOperationalJobStatus(jobId, "drain");
+ }
+
+ /**
+ * Validates the operational job status by querying the
OperationalJobHandler endpoint
+ * and waiting for the job to reach a final state if necessary.
+ *
+ * @param jobId the ID of the operational job to validate
+ * @param expectedOperation the expected operation name (e.g., "move",
"decommission", "drain")
+ */
+ private void validateOperationalJobStatus(String jobId, String
expectedOperation)
+ {
+ String operationalJobRoute =
ApiEndpointsV1.OPERATIONAL_JOB_ROUTE.replace(":operationId", jobId);
+
+ HttpResponse<Buffer> jobStatusResponse = getBlocking(
+ trustedClient().get(serverWrapper.serverPort, "localhost",
operationalJobRoute)
+ .send());
+
+ assertThat(jobStatusResponse.statusCode()).isEqualTo(OK.code());
+
+ JsonObject jobStatusBody = jobStatusResponse.bodyAsJsonObject();
+ assertThat(jobStatusBody).isNotNull();
+ assertThat(jobStatusBody.getString("jobId")).isEqualTo(jobId);
+
assertThat(jobStatusBody.getString("operation")).isEqualTo(expectedOperation);
+ assertThat(jobStatusBody.getString("jobStatus")).isIn(
+ OperationalJobStatus.RUNNING.name(),
+ OperationalJobStatus.SUCCEEDED.name()
+ );
+
+ // If the job is still running, wait for it to complete or reach a
final state
+ if
(OperationalJobStatus.RUNNING.name().equals(jobStatusBody.getString("jobStatus")))
+ {
+ loopAssert(30, 500, () -> {
+ HttpResponse<Buffer> finalJobStatusResponse = getBlocking(
+ trustedClient().get(serverWrapper.serverPort, "localhost",
operationalJobRoute)
+ .send());
+
+
assertThat(finalJobStatusResponse.statusCode()).isEqualTo(OK.code());
+
+ JsonObject finalJobStatusBody =
finalJobStatusResponse.bodyAsJsonObject();
+ assertThat(finalJobStatusBody).isNotNull();
+ assertThat(finalJobStatusBody.getString("jobStatus")).isIn(
+ OperationalJobStatus.SUCCEEDED.name(),
+ OperationalJobStatus.FAILED.name()
+ );
+ });
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected void tearDown() throws Exception
+ {
+ try
+ {
+ super.tearDown();
+ }
+ catch (IllegalStateException ex)
+ {
+ logger.error("Exception in tear down", ex);
+ // When cluster.close() is called after drain For Cassandra 4.0
+ // it throws IllegalStateException "HintsService has already been
shut down".
+ if (!CASSANDRA_VERSION_4_0.equals(this.testVersion.version()))
+ {
+ throw ex;
+ }
+ logger.warn("Suppressing {} for Cassandra version {}",
+ ex.getClass().getCanonicalName(),
CASSANDRA_VERSION_4_0);
+ }
+ }
+}
diff --git
a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java
b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java
index d135e61d..021afa52 100644
---
a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java
+++
b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java
@@ -118,6 +118,11 @@ public interface StorageOperations
*/
void decommission(boolean force);
+ /**
+ * Triggers the node drain operation
+ */
+ void drain() throws IOException, InterruptedException, ExecutionException;
+
/**
* @return returns true if gossip is running, false otherwise
*/
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java
b/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java
index b57ac623..1247739e 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java
@@ -62,6 +62,7 @@ public class BasicPermissions
// sidecar operation related permissions
public static final Permission READ_OPERATIONAL_JOB = new
DomainAwarePermission("OPERATIONAL_JOB:READ", OPERATION_SCOPE);
public static final Permission DECOMMISSION_NODE = new
DomainAwarePermission("NODE:DECOMMISSION", OPERATION_SCOPE);
+ public static final Permission DRAIN_NODE = new
DomainAwarePermission("NODE:DRAIN", OPERATION_SCOPE);
// Permissions related to Schema Reporting
public static final Permission REPORT_SCHEMA = new
DomainAwarePermission("SCHEMA:PUBLISH", CLUSTER_SCOPE);
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeDecommissionHandler.java
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeDecommissionHandler.java
index 63b5b66e..d2c522eb 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeDecommissionHandler.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeDecommissionHandler.java
@@ -23,18 +23,14 @@ import java.util.Set;
import com.datastax.driver.core.utils.UUIDs;
import com.google.inject.Inject;
-import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.net.SocketAddress;
import io.vertx.ext.auth.authorization.Authorization;
import io.vertx.ext.web.RoutingContext;
import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions;
-import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
-import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
import org.apache.cassandra.sidecar.common.server.StorageOperations;
import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
import org.apache.cassandra.sidecar.config.ServiceConfiguration;
-import org.apache.cassandra.sidecar.exceptions.OperationalJobConflictException;
import org.apache.cassandra.sidecar.job.NodeDecommissionJob;
import org.apache.cassandra.sidecar.job.OperationalJobManager;
import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
@@ -89,22 +85,11 @@ public class NodeDecommissionHandler extends
AbstractHandler<Boolean> implements
{
StorageOperations operations =
metadataFetcher.delegate(host).storageOperations();
NodeDecommissionJob job = new NodeDecommissionJob(UUIDs.timeBased(),
operations, isForce);
- try
- {
- jobManager.trySubmitJob(job);
- }
- catch (OperationalJobConflictException oje)
- {
- String reason = oje.getMessage();
- logger.error("Conflicting job encountered. reason={}", reason);
-
context.response().setStatusCode(HttpResponseStatus.CONFLICT.code());
- context.json(new OperationalJobResponse(job.jobId(),
OperationalJobStatus.FAILED, job.name(), reason));
- return;
- }
-
- // Get the result, waiting for the specified wait time for result
- job.asyncResult(executorPools.service(),
config.operationalJobExecutionMaxWaitTime())
- .onComplete(v ->
OperationalJobUtils.sendStatusBasedResponse(context, job));
+ this.jobManager.trySubmitJob(job,
+ (completedJob, exception) ->
+
OperationalJobUtils.sendStatusBasedResponse(context, completedJob, exception),
+ executorPools.service(),
+
config.operationalJobExecutionMaxWaitTime());
}
/**
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeDecommissionHandler.java
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeDrainHandler.java
similarity index 57%
copy from
server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeDecommissionHandler.java
copy to
server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeDrainHandler.java
index 63b5b66e..b0de6753 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeDecommissionHandler.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeDrainHandler.java
@@ -23,31 +23,25 @@ import java.util.Set;
import com.datastax.driver.core.utils.UUIDs;
import com.google.inject.Inject;
-import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.net.SocketAddress;
import io.vertx.ext.auth.authorization.Authorization;
import io.vertx.ext.web.RoutingContext;
import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions;
-import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
-import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
import org.apache.cassandra.sidecar.common.server.StorageOperations;
import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
import org.apache.cassandra.sidecar.config.ServiceConfiguration;
-import org.apache.cassandra.sidecar.exceptions.OperationalJobConflictException;
-import org.apache.cassandra.sidecar.job.NodeDecommissionJob;
+import org.apache.cassandra.sidecar.job.NodeDrainJob;
import org.apache.cassandra.sidecar.job.OperationalJobManager;
import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
import org.apache.cassandra.sidecar.utils.OperationalJobUtils;
import org.jetbrains.annotations.NotNull;
-import static
org.apache.cassandra.sidecar.utils.RequestUtils.parseBooleanQueryParam;
-
/**
- * Provides REST API for asynchronously decommissioning the corresponding
Cassandra node
+ * Provides REST API for asynchronously draining the corresponding Cassandra
node
*/
-public class NodeDecommissionHandler extends AbstractHandler<Boolean>
implements AccessProtected
+public class NodeDrainHandler extends AbstractHandler<Void> implements
AccessProtected
{
private final OperationalJobManager jobManager;
private final ServiceConfiguration config;
@@ -60,11 +54,11 @@ public class NodeDecommissionHandler extends
AbstractHandler<Boolean> implements
* @param validator a validator instance to validate
Cassandra-specific input
*/
@Inject
- protected NodeDecommissionHandler(InstanceMetadataFetcher metadataFetcher,
- ExecutorPools executorPools,
- ServiceConfiguration
serviceConfiguration,
- CassandraInputValidator validator,
- OperationalJobManager jobManager)
+ protected NodeDrainHandler(InstanceMetadataFetcher metadataFetcher,
+ ExecutorPools executorPools,
+ ServiceConfiguration serviceConfiguration,
+ CassandraInputValidator validator,
+ OperationalJobManager jobManager)
{
super(metadataFetcher, executorPools, validator);
this.jobManager = jobManager;
@@ -74,7 +68,7 @@ public class NodeDecommissionHandler extends
AbstractHandler<Boolean> implements
@Override
public Set<Authorization> requiredAuthorizations()
{
- return
Collections.singleton(BasicPermissions.DECOMMISSION_NODE.toAuthorization());
+ return
Collections.singleton(BasicPermissions.DRAIN_NODE.toAuthorization());
}
/**
@@ -85,34 +79,24 @@ public class NodeDecommissionHandler extends
AbstractHandler<Boolean> implements
HttpServerRequest httpRequest,
@NotNull String host,
SocketAddress remoteAddress,
- Boolean isForce)
+ Void unused)
{
StorageOperations operations =
metadataFetcher.delegate(host).storageOperations();
- NodeDecommissionJob job = new NodeDecommissionJob(UUIDs.timeBased(),
operations, isForce);
- try
- {
- jobManager.trySubmitJob(job);
- }
- catch (OperationalJobConflictException oje)
- {
- String reason = oje.getMessage();
- logger.error("Conflicting job encountered. reason={}", reason);
-
context.response().setStatusCode(HttpResponseStatus.CONFLICT.code());
- context.json(new OperationalJobResponse(job.jobId(),
OperationalJobStatus.FAILED, job.name(), reason));
- return;
- }
-
- // Get the result, waiting for the specified wait time for result
- job.asyncResult(executorPools.service(),
config.operationalJobExecutionMaxWaitTime())
- .onComplete(v ->
OperationalJobUtils.sendStatusBasedResponse(context, job));
+ NodeDrainJob job = new NodeDrainJob(UUIDs.timeBased(), operations);
+ this.jobManager.trySubmitJob(job,
+ (completedJob, exception) ->
+
OperationalJobUtils.sendStatusBasedResponse(context, completedJob, exception),
+ executorPools.service(),
+
config.operationalJobExecutionMaxWaitTime());
}
/**
* {@inheritDoc}
*/
@Override
- protected Boolean extractParamsOrThrow(RoutingContext context)
+ protected Void extractParamsOrThrow(RoutingContext context)
{
- return parseBooleanQueryParam(context.request(), "force", false);
+ // No parameters needed for drain operation
+ return null;
}
}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/OperationalJobHandler.java
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/OperationalJobHandler.java
index ac483974..44925c72 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/OperationalJobHandler.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/OperationalJobHandler.java
@@ -85,7 +85,7 @@ public class OperationalJobHandler extends
AbstractHandler<UUID> implements Acce
return job;
})
.onFailure(cause -> processFailure(cause, context, host,
remoteAddress, jobId))
- .onSuccess(job ->
OperationalJobUtils.sendStatusBasedResponse(context, job));
+ .onSuccess(job ->
OperationalJobUtils.sendStatusBasedResponse(context, job, null));
}
/**
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/job/NodeDrainJob.java
b/server/src/main/java/org/apache/cassandra/sidecar/job/NodeDrainJob.java
new file mode 100644
index 00000000..d6485764
--- /dev/null
+++ b/server/src/main/java/org/apache/cassandra/sidecar/job/NodeDrainJob.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.job;
+
+import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import
org.apache.cassandra.sidecar.common.server.exceptions.OperationalJobException;
+
+/**
+ * Implementation of {@link OperationalJob} to perform node drain operation.
+ */
+public class NodeDrainJob extends OperationalJob
+{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(NodeDrainJob.class);
+ private static final String OPERATION = "drain";
+ protected StorageOperations storageOperations;
+
+ /**
+ * Enum representing the various drain states of a Cassandra node.
+ */
+ public enum NodeDrainStateEnum
+ {
+ DRAINING(OperationalJobStatus.RUNNING),
+ DRAINED(OperationalJobStatus.SUCCEEDED);
+
+ final OperationalJobStatus jobStatus;
+
+ NodeDrainStateEnum(OperationalJobStatus jobStatus)
+ {
+ this.jobStatus = jobStatus;
+ }
+
+ /**
+ * Converts a string operation mode to a DrainState enum.
+ *
+ * @param operationMode the operation mode string
+ * @return the corresponding DrainState, or null if not a drain state
+ */
+ public static NodeDrainStateEnum fromOperationMode(String
operationMode)
+ {
+ try
+ {
+ return valueOf(operationMode);
+ }
+ catch (IllegalArgumentException | NullPointerException e)
+ {
+ LOGGER.error("No matching NodeDrainStateEnum found for
Cassandra node status={}. Error={}",
+ operationMode, e);
+ return null;
+ }
+ }
+ }
+
+ public NodeDrainJob(UUID jobId, StorageOperations storageOps)
+ {
+ super(jobId);
+ this.storageOperations = storageOps;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean isRunningOnCassandra()
+ {
+ String operationMode = storageOperations.operationMode();
+ NodeDrainStateEnum nodeDrainStateEnum =
NodeDrainStateEnum.fromOperationMode(operationMode);
+ return nodeDrainStateEnum == NodeDrainStateEnum.DRAINING;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public OperationalJobStatus status()
+ {
+ String operationMode = storageOperations.operationMode();
+ NodeDrainStateEnum nodeDrainStateEnum =
NodeDrainStateEnum.fromOperationMode(operationMode);
+
+ if (nodeDrainStateEnum != null)
+ {
+ return nodeDrainStateEnum.jobStatus;
+ }
+ return super.status();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected void executeInternal()
+ {
+ if (isRunningOnCassandra())
+ {
+ LOGGER.info("Not executing job as an ongoing drain operation was
found jobId={}", this.jobId());
+ return;
+ }
+
+ LOGGER.info("Executing drain operation. jobId={}", this.jobId());
+ try
+ {
+ storageOperations.drain();
+ }
+ catch (IOException | ExecutionException | InterruptedException ex)
+ {
+ LOGGER.error("Job execution failed. jobId={} reason='{}'",
this.jobId(), ex.getMessage(), ex);
+ throw OperationalJobException.wraps(ex);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String name()
+ {
+ return OPERATION;
+ }
+}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobManager.java
b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobManager.java
index 3c37ddaa..6d36f353 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobManager.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobManager.java
@@ -20,10 +20,16 @@ package org.apache.cassandra.sidecar.job;
import java.util.List;
import java.util.UUID;
+import java.util.function.BiConsumer;
import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.google.inject.Inject;
import com.google.inject.Singleton;
+import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
import org.apache.cassandra.sidecar.exceptions.OperationalJobConflictException;
@@ -34,6 +40,7 @@ import
org.apache.cassandra.sidecar.exceptions.OperationalJobConflictException;
@Singleton
public class OperationalJobManager
{
+ protected final Logger logger = LoggerFactory.getLogger(this.getClass());
private final OperationalJobTracker jobTracker;
private final TaskExecutorPool internalExecutorPool;
@@ -78,19 +85,37 @@ public class OperationalJobManager
* Try to submit the job to execute asynchronously, if it is not currently
being
* tracked and not running. The job is triggered on a separate internal
thread-pool.
* The job execution failure behavior is tracked within the {@link
OperationalJob}.
+ * This method provides a callback mechanism and configurable wait time
for job completion.
*
- * @param job OperationalJob instance to submit
- * @throws OperationalJobConflictException when the same operational job
is already running on Cassandra
+ * @param job OperationalJob instance to submit
+ * @param onComplete callback to invoke when the job completes
(successfully or with exception)
+ * @param serviceExecutorPool the executor pool to use for waiting on job
completion
+ * @param waitTime the maximum time to wait for job completion
before returning
*/
- public void trySubmitJob(OperationalJob job) throws
OperationalJobConflictException
+ public void trySubmitJob(OperationalJob job,
+ BiConsumer<OperationalJob,
OperationalJobConflictException> onComplete,
+ TaskExecutorPool serviceExecutorPool,
+ DurationSpec waitTime)
{
- checkConflict(job);
+ try
+ {
+ checkConflict(job);
+
+ // New job is submitted for all cases when we do not have a
corresponding downstream job
+ jobTracker.computeIfAbsent(job.jobId(), jobId -> {
+ internalExecutorPool.executeBlocking(job::execute);
+ return job;
+ });
+ }
+ catch (OperationalJobConflictException oje)
+ {
+ onComplete.accept(job, oje);
+ return;
+ }
- // New job is submitted for all cases when we do not have a
corresponding downstream job
- jobTracker.computeIfAbsent(job.jobId(), jobId -> {
- internalExecutorPool.executeBlocking(job::execute);
- return job;
- });
+ // Get the result, waiting for the specified wait time for result
+ job.asyncResult(serviceExecutorPool, waitTime)
+ .onComplete(v -> onComplete.accept(job, null));
}
/**
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java
b/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java
index 9b1278a8..3101f421 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java
@@ -45,6 +45,7 @@ import
org.apache.cassandra.sidecar.handlers.KeyspaceSchemaHandler;
import org.apache.cassandra.sidecar.handlers.ListOperationalJobsHandler;
import org.apache.cassandra.sidecar.handlers.NativeUpdateHandler;
import org.apache.cassandra.sidecar.handlers.NodeDecommissionHandler;
+import org.apache.cassandra.sidecar.handlers.NodeDrainHandler;
import org.apache.cassandra.sidecar.handlers.OperationalJobHandler;
import org.apache.cassandra.sidecar.handlers.RingHandler;
import org.apache.cassandra.sidecar.handlers.SchemaHandler;
@@ -156,6 +157,26 @@ public class CassandraOperationsModule extends
AbstractModule
return factory.buildRouteWithHandler(nodeDecommissionHandler);
}
+ @PUT
+ @Path(ApiEndpointsV1.NODE_DRAIN_ROUTE)
+ @Operation(summary = "Drain node",
+ description = "Drains the Cassandra node by flushing memtables
and stopping writes")
+ @APIResponse(description = "Node drain operation completed successfully",
+ responseCode = "200",
+ content = @Content(mediaType = "application/json",
+ schema = @Schema(implementation =
OperationalJobResponse.class)))
+ @APIResponse(description = "Node drain operation initiated successfully",
+ responseCode = "202",
+ content = @Content(mediaType = "application/json",
+ schema = @Schema(implementation =
OperationalJobResponse.class)))
+ @ProvidesIntoMap
+ @KeyClassMapKey(VertxRouteMapKeys.CassandraNodeDrainRouteKey.class)
+ VertxRoute cassandraNodeDrainRoute(RouteBuilder.Factory factory,
+ NodeDrainHandler nodeDrainHandler)
+ {
+ return factory.buildRouteWithHandler(nodeDrainHandler);
+ }
+
@GET
@Path(ApiEndpointsV1.STREAM_STATS_ROUTE)
@Operation(summary = "Get stream statistics",
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java
b/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java
index e3bec846..b1561e79 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java
@@ -83,6 +83,11 @@ public interface VertxRouteMapKeys
HttpMethod HTTP_METHOD = HttpMethod.PUT;
String ROUTE_URI = ApiEndpointsV1.NODE_DECOMMISSION_ROUTE;
}
+ interface CassandraNodeDrainRouteKey extends RouteClassKey
+ {
+ HttpMethod HTTP_METHOD = HttpMethod.PUT;
+ String ROUTE_URI = ApiEndpointsV1.NODE_DRAIN_ROUTE;
+ }
interface CassandraNodeSettingsRouteKey extends RouteClassKey
{
HttpMethod HTTP_METHOD = HttpMethod.GET;
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/utils/OperationalJobUtils.java
b/server/src/main/java/org/apache/cassandra/sidecar/utils/OperationalJobUtils.java
index e36f1a89..e640b94e 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/utils/OperationalJobUtils.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/utils/OperationalJobUtils.java
@@ -25,6 +25,7 @@ import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.ext.web.RoutingContext;
import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
+import org.apache.cassandra.sidecar.exceptions.OperationalJobConflictException;
import org.apache.cassandra.sidecar.job.OperationalJob;
import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.FAILED;
@@ -37,13 +38,26 @@ public class OperationalJobUtils
private static final Logger LOGGER =
LoggerFactory.getLogger(OperationalJobUtils.class);
/**
- * In the operational job context, sends a {@link OperationalJobResponse}
based on the status of the job.
+ * Sends an HTTP response with appropriate status code and {@link
OperationalJobResponse} payload
+ * based on the operational job status and any conflict exceptions.
+ * If an exception is provided, responds with HTTP 409 CONFLICT and FAILED
status.
+ * Otherwise, responds with HTTP 200 OK for completed jobs or HTTP 202
ACCEPTED for ongoing jobs.
*
- * @param context the request context
- * @param job the operational job to reports status on
+ * @param context the routing context for the HTTP request
+ * @param job the operational job to report status on
+ * @param exception the conflict exception, if any (null if no conflict)
*/
- public static void sendStatusBasedResponse(RoutingContext context,
OperationalJob job)
+ public static void sendStatusBasedResponse(RoutingContext context,
OperationalJob job, OperationalJobConflictException exception)
{
+ if (exception != null)
+ {
+ String reason = exception.getMessage();
+ LOGGER.error("Conflicting job encountered. reason={}", reason);
+
context.response().setStatusCode(HttpResponseStatus.CONFLICT.code());
+ context.json(new OperationalJobResponse(job.jobId(),
OperationalJobStatus.FAILED, job.name(), reason));
+ return;
+ }
+
OperationalJobStatus status = job.status();
LOGGER.info("Job completion status={} jobId={}", status, job.jobId());
if (status.isCompleted())
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/handlers/NodeDrainHandlerTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/NodeDrainHandlerTest.java
new file mode 100644
index 00000000..49e1604a
--- /dev/null
+++
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/NodeDrainHandlerTest.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.handlers;
+
+import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import com.google.inject.util.Modules;
+import io.vertx.core.Vertx;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.predicate.ResponsePredicate;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.sidecar.TestModule;
+import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
+import org.apache.cassandra.sidecar.cluster.InstancesMetadata;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.modules.SidecarModules;
+import org.apache.cassandra.sidecar.server.Server;
+import org.mockito.AdditionalAnswers;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.ACCEPTED;
+import static io.netty.handler.codec.http.HttpResponseStatus.CONFLICT;
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.RUNNING;
+import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.SUCCEEDED;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link NodeDrainHandler}
+ */
+@ExtendWith(VertxExtension.class)
+public class NodeDrainHandlerTest
+{
+ static final Logger LOGGER =
LoggerFactory.getLogger(NodeDrainHandlerTest.class);
+ static final String TEST_ROUTE = "/api/v1/cassandra/operations/drain";
+ static final String OPERATION_MODE_NORMAL = "NORMAL";
+ static final String OPERATION_MODE_DRAINING = "DRAINING";
+ static final String OPERATION_MODE_JOINING = "JOINING";
+ static final String EXPECTED_OPERATION_NAME = "drain";
+ static final String SIMULATED_DRAIN_FAILURE = "Simulated drain failure";
+
+ Vertx vertx;
+ Server server;
+ StorageOperations mockStorageOperations = mock(StorageOperations.class);
+
+ @BeforeEach
+ void before() throws InterruptedException
+ {
+ Injector injector;
+ Module testOverride = Modules.override(new TestModule())
+ .with(new
NodeDrainHandlerTest.NodeDrainTestModule());
+ injector = Guice.createInjector(Modules.override(SidecarModules.all())
+ .with(testOverride));
+ vertx = injector.getInstance(Vertx.class);
+ server = injector.getInstance(Server.class);
+ VertxTestContext context = new VertxTestContext();
+ server.start()
+ .onSuccess(s -> context.completeNow())
+ .onFailure(context::failNow);
+ context.awaitCompletion(5, TimeUnit.SECONDS);
+ }
+
+ @AfterEach
+ void after() throws InterruptedException
+ {
+ CountDownLatch closeLatch = new CountDownLatch(1);
+ server.close().onSuccess(res -> closeLatch.countDown());
+ if (closeLatch.await(60, TimeUnit.SECONDS))
+ LOGGER.info("Close event received before timeout.");
+ else
+ LOGGER.error("Close event timed out.");
+ }
+
+ @Test
+ void testDrainLongRunning(VertxTestContext context) throws Exception
+ {
+
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL);
+ doAnswer(AdditionalAnswers.answersWithDelay(6000, invocation -> null))
+ .when(mockStorageOperations).drain();
+
+ WebClient client = WebClient.create(vertx);
+ client.put(server.actualPort(), "127.0.0.1", TEST_ROUTE)
+ .expect(ResponsePredicate.SC_ACCEPTED)
+ .send(context.succeeding(response -> {
+ assertThat(response.statusCode()).isEqualTo(ACCEPTED.code());
+ OperationalJobResponse drainResponse =
response.bodyAsJson(OperationalJobResponse.class);
+ assertThat(drainResponse).isNotNull();
+ assertThat(drainResponse.status()).isEqualTo(RUNNING);
+ assertThat(drainResponse.jobId()).isNotNull();
+
assertThat(drainResponse.operation()).isEqualTo(EXPECTED_OPERATION_NAME);
+ context.completeNow();
+ }));
+ }
+
+ @Test
+ void testDrainCompleted(VertxTestContext context)
+ {
+
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL);
+
+ WebClient client = WebClient.create(vertx);
+ client.put(server.actualPort(), "127.0.0.1", TEST_ROUTE)
+ .send(context.succeeding(response -> {
+ assertThat(response.statusCode()).isEqualTo(OK.code());
+ LOGGER.info("Drain Response: {}", response.bodyAsString());
+
+ OperationalJobResponse drainResponse =
response.bodyAsJson(OperationalJobResponse.class);
+ assertThat(drainResponse).isNotNull();
+ assertThat(drainResponse.status()).isEqualTo(SUCCEEDED);
+ assertThat(drainResponse.jobId()).isNotNull();
+
assertThat(drainResponse.operation()).isEqualTo(EXPECTED_OPERATION_NAME);
+ context.completeNow();
+ }));
+ }
+
+ @Test
+ void testDrainFailed(VertxTestContext context) throws Exception
+ {
+
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL);
+ doThrow(new
RuntimeException(SIMULATED_DRAIN_FAILURE)).when(mockStorageOperations).drain();
+
+ WebClient client = WebClient.create(vertx);
+ client.put(server.actualPort(), "127.0.0.1", TEST_ROUTE)
+ .expect(ResponsePredicate.SC_OK)
+ .send(context.succeeding(response -> {
+ assertThat(response.statusCode()).isEqualTo(OK.code());
+ LOGGER.info("Drain Response: {}", response.bodyAsString());
+
+ OperationalJobResponse drainResponse =
response.bodyAsJson(OperationalJobResponse.class);
+ assertThat(drainResponse).isNotNull();
+ assertThat(drainResponse.jobId()).isNotNull();
+
assertThat(drainResponse.operation()).isEqualTo(EXPECTED_OPERATION_NAME);
+ context.completeNow();
+ }));
+ }
+
+ @Test
+ void testDrainConflictWhenDraining(VertxTestContext context)
+ {
+
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_DRAINING);
+
+ WebClient client = WebClient.create(vertx);
+ client.put(server.actualPort(), "127.0.0.1", TEST_ROUTE)
+ .expect(ResponsePredicate.SC_CONFLICT)
+ .send(context.succeeding(response -> {
+ assertThat(response.statusCode()).isEqualTo(CONFLICT.code());
+ LOGGER.info("Drain Response: {}", response.bodyAsString());
+
+ OperationalJobResponse drainResponse =
response.bodyAsJson(OperationalJobResponse.class);
+ assertThat(drainResponse).isNotNull();
+ assertThat(drainResponse.jobId()).isNotNull();
+
assertThat(drainResponse.operation()).isEqualTo(EXPECTED_OPERATION_NAME);
+ context.completeNow();
+ }));
+ }
+
+ @Test
+ void testDrainAllowedWhenJoining(VertxTestContext context)
+ {
+
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_JOINING);
+
+ WebClient client = WebClient.create(vertx);
+ client.put(server.actualPort(), "127.0.0.1", TEST_ROUTE)
+ .send(context.succeeding(response -> {
+ assertThat(response.statusCode()).isEqualTo(OK.code());
+ LOGGER.info("Drain Response: {}", response.bodyAsString());
+
+ OperationalJobResponse drainResponse =
response.bodyAsJson(OperationalJobResponse.class);
+ assertThat(drainResponse).isNotNull();
+ assertThat(drainResponse.status()).isEqualTo(SUCCEEDED);
+ assertThat(drainResponse.jobId()).isNotNull();
+
assertThat(drainResponse.operation()).isEqualTo(EXPECTED_OPERATION_NAME);
+ context.completeNow();
+ }));
+ }
+
+ /**
+ * Test guice module for Node Drain handler tests
+ */
+ class NodeDrainTestModule extends AbstractModule
+ {
+ @Provides
+ @Singleton
+ public InstancesMetadata instanceMetadata()
+ {
+ int instanceId = 100;
+ String host = "127.0.0.1";
+ InstanceMetadata instanceMetadata = mock(InstanceMetadata.class);
+ when(instanceMetadata.host()).thenReturn(host);
+ when(instanceMetadata.port()).thenReturn(9042);
+ when(instanceMetadata.id()).thenReturn(instanceId);
+ when(instanceMetadata.stagingDir()).thenReturn("");
+
+ CassandraAdapterDelegate delegate =
mock(CassandraAdapterDelegate.class);
+
+
when(delegate.storageOperations()).thenReturn(mockStorageOperations);
+ when(instanceMetadata.delegate()).thenReturn(delegate);
+
+ InstancesMetadata mockInstancesMetadata =
mock(InstancesMetadata.class);
+
when(mockInstancesMetadata.instances()).thenReturn(Collections.singletonList(instanceMetadata));
+
when(mockInstancesMetadata.instanceFromId(instanceId)).thenReturn(instanceMetadata);
+
when(mockInstancesMetadata.instanceFromHost(host)).thenReturn(instanceMetadata);
+
+ return mockInstancesMetadata;
+ }
+ }
+}
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/job/NodeDrainJobTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/job/NodeDrainJobTest.java
new file mode 100644
index 00000000..72df6fb1
--- /dev/null
+++
b/server/src/test/java/org/apache/cassandra/sidecar/job/NodeDrainJobTest.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.job;
+
+import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import com.datastax.driver.core.utils.UUIDs;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import
org.apache.cassandra.sidecar.common.server.exceptions.OperationalJobException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for {@link NodeDrainJob}
+ */
+class NodeDrainJobTest
+{
+ private static final String OPERATION_MODE_DRAINING = "DRAINING";
+ private static final String OPERATION_MODE_DRAINED = "DRAINED";
+ private static final String OPERATION_MODE_NORMAL = "NORMAL";
+ private static final String OPERATION_MODE_UNKNOWN = "UNKNOWN";
+ private static final String JOB_NAME_DRAIN = "drain";
+
+ private StorageOperations mockStorageOperations;
+ private NodeDrainJob nodeDrainJob;
+ private UUID jobId;
+
+ @BeforeEach
+ void setup()
+ {
+ mockStorageOperations = mock(StorageOperations.class);
+ jobId = UUIDs.timeBased();
+ nodeDrainJob = new NodeDrainJob(jobId, mockStorageOperations);
+ }
+
+ @Test
+ void testJobIdAndName()
+ {
+ assertThat(nodeDrainJob.jobId()).isEqualTo(jobId);
+ assertThat(nodeDrainJob.name()).isEqualTo(JOB_NAME_DRAIN);
+ }
+
+ @Test
+ void testIsRunningOnCassandra_WhenDraining()
+ {
+
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_DRAINING);
+
+ assertThat(nodeDrainJob.isRunningOnCassandra()).isTrue();
+ }
+
+ @Test
+ void testIsRunningOnCassandra_WhenDrained()
+ {
+
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_DRAINED);
+
+ assertThat(nodeDrainJob.isRunningOnCassandra()).isFalse();
+ }
+
+ @Test
+ void testIsRunningOnCassandra_WhenNormal()
+ {
+
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL);
+
+ assertThat(nodeDrainJob.isRunningOnCassandra()).isFalse();
+ }
+
+ @Test
+ void testIsRunningOnCassandra_WhenUnknownState()
+ {
+
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_UNKNOWN);
+
+ assertThat(nodeDrainJob.isRunningOnCassandra()).isFalse();
+ }
+
+ @Test
+ void testIsRunningOnCassandra_WhenNull()
+ {
+ when(mockStorageOperations.operationMode()).thenReturn(null);
+
+ assertThat(nodeDrainJob.isRunningOnCassandra()).isFalse();
+ }
+
+ @Test
+ void testStatus_WhenDraining()
+ {
+
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_DRAINING);
+
+
assertThat(nodeDrainJob.status()).isEqualTo(OperationalJobStatus.RUNNING);
+ }
+
+ @Test
+ void testStatus_WhenDrained()
+ {
+
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_DRAINED);
+
+
assertThat(nodeDrainJob.status()).isEqualTo(OperationalJobStatus.SUCCEEDED);
+ }
+
+ @Test
+ void testStatus_WhenNormal()
+ {
+
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL);
+
+
assertThat(nodeDrainJob.status()).isEqualTo(OperationalJobStatus.CREATED);
+ }
+
+ @Test
+ void testStatus_WhenUnknownState()
+ {
+
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_UNKNOWN);
+
+
assertThat(nodeDrainJob.status()).isEqualTo(OperationalJobStatus.CREATED);
+ }
+
+ @Test
+ void testStatus_WhenNull()
+ {
+ when(mockStorageOperations.operationMode()).thenReturn(null);
+
+
assertThat(nodeDrainJob.status()).isEqualTo(OperationalJobStatus.CREATED);
+ }
+
+ @Test
+ void testExecuteInternal_WhenNotRunning() throws IOException,
ExecutionException, InterruptedException
+ {
+
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL);
+
+ nodeDrainJob.executeInternal();
+
+ verify(mockStorageOperations).drain();
+ }
+
+ @Test
+ void testExecuteInternal_WhenAlreadyDraining() throws IOException,
ExecutionException, InterruptedException
+ {
+
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_DRAINING);
+
+ nodeDrainJob.executeInternal();
+
+ verify(mockStorageOperations, never()).drain();
+ }
+
+ @Test
+ void testExecuteInternal_WhenDrainThrowsIOException() throws IOException,
ExecutionException, InterruptedException
+ {
+
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL);
+ IOException ioException = new IOException("Drain failed due to IO
error");
+ doThrow(ioException).when(mockStorageOperations).drain();
+
+ assertThatThrownBy(() -> nodeDrainJob.executeInternal())
+ .isInstanceOf(OperationalJobException.class)
+ .hasCause(ioException);
+
+ verify(mockStorageOperations).drain();
+ }
+
+ @Test
+ void testExecuteInternal_WhenDrainThrowsExecutionException() throws
IOException, ExecutionException, InterruptedException
+ {
+
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL);
+ ExecutionException executionException = new ExecutionException("Drain
failed during execution", new RuntimeException());
+ doThrow(executionException).when(mockStorageOperations).drain();
+
+ assertThatThrownBy(() -> nodeDrainJob.executeInternal())
+ .isInstanceOf(OperationalJobException.class)
+ .hasCause(executionException);
+
+ verify(mockStorageOperations).drain();
+ }
+
+ @Test
+ void testExecuteInternal_WhenDrainThrowsInterruptedException() throws
IOException, ExecutionException, InterruptedException
+ {
+
when(mockStorageOperations.operationMode()).thenReturn(OPERATION_MODE_NORMAL);
+ InterruptedException interruptedException = new
InterruptedException("Drain was interrupted");
+ doThrow(interruptedException).when(mockStorageOperations).drain();
+
+ assertThatThrownBy(() -> nodeDrainJob.executeInternal())
+ .isInstanceOf(OperationalJobException.class)
+ .hasCause(interruptedException);
+
+ verify(mockStorageOperations).drain();
+ }
+
+ @Test
+ void testNodeDrainStateEnum_FromOperationMode()
+ {
+
assertThat(NodeDrainJob.NodeDrainStateEnum.fromOperationMode(OPERATION_MODE_DRAINING))
+ .isEqualTo(NodeDrainJob.NodeDrainStateEnum.DRAINING);
+
+
assertThat(NodeDrainJob.NodeDrainStateEnum.fromOperationMode(OPERATION_MODE_DRAINED))
+ .isEqualTo(NodeDrainJob.NodeDrainStateEnum.DRAINED);
+
+
assertThat(NodeDrainJob.NodeDrainStateEnum.fromOperationMode(OPERATION_MODE_NORMAL))
+ .isNull();
+
+
assertThat(NodeDrainJob.NodeDrainStateEnum.fromOperationMode(OPERATION_MODE_UNKNOWN))
+ .isNull();
+
+ assertThat(NodeDrainJob.NodeDrainStateEnum.fromOperationMode(null))
+ .isNull();
+
+ assertThat(NodeDrainJob.NodeDrainStateEnum.fromOperationMode(""))
+ .isNull();
+ }
+
+ @Test
+ void testNodeDrainStateEnum_JobStatusMapping()
+ {
+ assertThat(NodeDrainJob.NodeDrainStateEnum.DRAINING.jobStatus)
+ .isEqualTo(OperationalJobStatus.RUNNING);
+
+ assertThat(NodeDrainJob.NodeDrainStateEnum.DRAINED.jobStatus)
+ .isEqualTo(OperationalJobStatus.SUCCEEDED);
+ }
+
+ @Test
+ void testNodeDrainStateEnum_Values()
+ {
+ NodeDrainJob.NodeDrainStateEnum[] expectedValues = {
+ NodeDrainJob.NodeDrainStateEnum.DRAINING,
+ NodeDrainJob.NodeDrainStateEnum.DRAINED
+ };
+
+
assertThat(NodeDrainJob.NodeDrainStateEnum.values()).containsExactly(expectedValues);
+ }
+
+ @Test
+ void testNodeDrainStateEnum_ValueOf()
+ {
+
assertThat(NodeDrainJob.NodeDrainStateEnum.valueOf(OPERATION_MODE_DRAINING))
+ .isEqualTo(NodeDrainJob.NodeDrainStateEnum.DRAINING);
+
+
assertThat(NodeDrainJob.NodeDrainStateEnum.valueOf(OPERATION_MODE_DRAINED))
+ .isEqualTo(NodeDrainJob.NodeDrainStateEnum.DRAINED);
+
+ assertThatThrownBy(() ->
NodeDrainJob.NodeDrainStateEnum.valueOf(OPERATION_MODE_NORMAL))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+}
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobManagerTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobManagerTest.java
index f7b6cfe9..d3574564 100644
---
a/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobManagerTest.java
+++
b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobManagerTest.java
@@ -19,29 +19,26 @@
package org.apache.cassandra.sidecar.job;
import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import com.datastax.driver.core.utils.UUIDs;
-import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import org.apache.cassandra.sidecar.TestResourceReaper;
import
org.apache.cassandra.sidecar.common.server.exceptions.OperationalJobException;
import
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
-import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl;
import org.apache.cassandra.sidecar.exceptions.OperationalJobConflictException;
import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.RUNNING;
import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.SUCCEEDED;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
/**
* Tests to validate the Job submission behavior for scenarios which are a
combination of values for
@@ -71,59 +68,74 @@ class OperationalJobManagerTest
}
@Test
- void testWithNoDownstreamJob()
+ void testWithNoDownstreamJob() throws InterruptedException
{
OperationalJobTracker tracker = new OperationalJobTracker(4);
OperationalJobManager manager = new OperationalJobManager(tracker,
executorPool);
+ CountDownLatch latch = new CountDownLatch(1);
OperationalJob testJob =
OperationalJobTest.createOperationalJob(SUCCEEDED);
- manager.trySubmitJob(testJob);
- testJob.execute(Promise.promise());
+ BiConsumer<OperationalJob, OperationalJobConflictException> onComplete
= (job, exception) -> {
+ assertThat(exception).isNull();
+ latch.countDown();
+ };
+
+ manager.trySubmitJob(testJob, onComplete, executorPool.service(),
SecondBoundConfiguration.parse("5s"));
+ assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(testJob.asyncResult().isComplete()).isTrue();
assertThat(testJob.status()).isEqualTo(SUCCEEDED);
assertThat(tracker.get(testJob.jobId())).isNotNull();
}
@Test
- void testWithRunningDownstreamJob()
+ void testWithRunningDownstreamJob() throws InterruptedException
{
OperationalJob runningJob =
OperationalJobTest.createOperationalJob(RUNNING);
OperationalJobTracker tracker = new OperationalJobTracker(4);
- ExecutorPools mockPools = mock(ExecutorPools.class);
- TaskExecutorPool mockExecPool = mock(TaskExecutorPool.class);
- when(mockPools.internal()).thenReturn(mockExecPool);
- when(mockExecPool.runBlocking(any())).thenReturn(null);
OperationalJobManager manager = new OperationalJobManager(tracker,
executorPool);
- assertThatThrownBy(() -> manager.trySubmitJob(runningJob))
- .isExactlyInstanceOf(OperationalJobConflictException.class)
- .hasMessage("The same operational job is already running on Cassandra.
operationName='Operation X'");
+ CountDownLatch latch = new CountDownLatch(1);
+
+ BiConsumer<OperationalJob, OperationalJobConflictException> onComplete
= (job, exception) -> {
+
assertThat(exception).isInstanceOf(OperationalJobConflictException.class);
+ assertThat(exception.getMessage()).isEqualTo("The same operational
job is already running on Cassandra. operationName='Operation X'");
+ latch.countDown();
+ };
+
+ manager.trySubmitJob(runningJob, onComplete, executorPool.service(),
SecondBoundConfiguration.parse("5s"));
+ assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
}
@Test
- void testWithLongRunningJob()
+ void testWithLongRunningJob() throws InterruptedException
{
UUID jobId = UUIDs.timeBased();
-
OperationalJobTracker tracker = new OperationalJobTracker(4);
OperationalJobManager manager = new OperationalJobManager(tracker,
executorPool);
+ CountDownLatch latch = new CountDownLatch(1);
- OperationalJob testJob =
OperationalJobTest.createOperationalJob(jobId,
SecondBoundConfiguration.parse("10s"));
+ OperationalJob testJob =
OperationalJobTest.createOperationalJob(jobId,
SecondBoundConfiguration.parse("2s"));
+ BiConsumer<OperationalJob, OperationalJobConflictException> onComplete
= (job, exception) -> {
+ assertThat(exception).isNull();
+ latch.countDown();
+ };
- manager.trySubmitJob(testJob);
- // execute the job async.
- vertx.executeBlocking(testJob::execute);
- // by the time of checking, the job should still be running. It runs
for 10 seconds.
+ manager.trySubmitJob(testJob, onComplete, executorPool.service(),
SecondBoundConfiguration.parse("5s"));
+ // Job should be running initially
assertThat(testJob.asyncResult().isComplete()).isFalse();
assertThat(tracker.get(jobId)).isNotNull();
+
+ // Wait for completion
+ assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
+ assertThat(testJob.asyncResult().isComplete()).isTrue();
}
@Test
- void testWithFailingJob()
+ void testWithFailingJob() throws InterruptedException
{
UUID jobId = UUIDs.timeBased();
-
OperationalJobTracker tracker = new OperationalJobTracker(4);
OperationalJobManager manager = new OperationalJobManager(tracker,
executorPool);
+ CountDownLatch latch = new CountDownLatch(1);
String msg = "Test Job failed";
OperationalJob failingJob = new OperationalJob(jobId)
@@ -141,10 +153,15 @@ class OperationalJobManagerTest
}
};
- manager.trySubmitJob(failingJob);
- failingJob.execute(Promise.promise());
- assertThat(failingJob.asyncResult().isComplete()).isTrue();
- assertThat(failingJob.asyncResult().failed()).isTrue();
+ BiConsumer<OperationalJob, OperationalJobConflictException> onComplete
= (job, exception) -> {
+ assertThat(exception).isNull(); // Exception handled internally by
job
+ assertThat(job.asyncResult().isComplete()).isTrue();
+ assertThat(job.asyncResult().failed()).isTrue();
+ latch.countDown();
+ };
+
+ manager.trySubmitJob(failingJob, onComplete, executorPool.service(),
SecondBoundConfiguration.parse("5s"));
+ assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(tracker.get(jobId)).isNotNull();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]