This is an automated email from the ASF dual-hosted git repository.
saranyak pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8ab41677 CASSSIDECAR-360: Sidecar API Endpoint for Nodetool Compaction
Stop (#272)
8ab41677 is described below
commit 8ab41677c921dd296ec30efdbda2b9d5273a4f8f
Author: Shalni Sundram <[email protected]>
AuthorDate: Fri Jan 9 17:57:26 2026 -0800
CASSSIDECAR-360: Sidecar API Endpoint for Nodetool Compaction Stop (#272)
Patch by Shalni Sundram; reviewed by Arjun Ashok, Bernardo Botella,
Jyothsna Konisa, Saranya Krishnakumar, Sudipta Laha for CASSSIDECAR-360
---
CHANGES.txt | 1 +
.../sidecar/adapters/base/CassandraAdapter.java | 7 +-
.../base/CassandraCompactionManagerOperations.java | 59 +-
.../sidecar/adapters/base/CompactionType.java | 82 +++
.../base/jmx/CompactionManagerJmxOperations.java | 12 +
.../CassandraCompactionManagerOperationsTest.java | 125 +++++
adapters/adapters-cassandra50/build.gradle | 57 ++
.../adapters/cassandra50/Cassandra50Adapter.java | 70 +++
.../Cassandra50CompactionManagerOperations.java} | 32 +-
.../adapters/cassandra50/Cassandra50Factory.java | 64 +++
.../cassandra50/Cassandra50StorageOperations.java | 58 ++
.../adapters/cassandra50/CompactionType.java | 83 +++
.../cassandra/sidecar/common/ApiEndpointsV1.java | 3 +-
.../sidecar/common/data/CompactionStopStatus.java | 21 +-
.../common/request/CompactionStopRequest.java | 61 +++
.../request/data/CompactionStopRequestPayload.java | 128 +++++
.../common/response/CompactionStopResponse.java | 169 ++++++
.../data/CompactionStopRequestPayloadTest.java | 198 +++++++
.../cassandra/sidecar/client/RequestContext.java | 14 +
.../cassandra/sidecar/client/SidecarClient.java | 18 +
.../sidecar/client/SidecarClientTest.java | 68 ++-
integration-framework/build.gradle | 1 +
.../org/apache/cassandra/testing/TestUtils.java | 2 +
.../routes/CompactionStopIntegrationTest.java | 592 +++++++++++++++++++++
.../common/server/CompactionManagerOperations.java | 20 +
server/build.gradle | 1 +
.../acl/authorization/BasicPermissions.java | 1 +
.../sidecar/handlers/CompactionStopHandler.java | 165 ++++++
.../sidecar/modules/CassandraOperationsModule.java | 23 +
.../sidecar/modules/ConfigurationModule.java | 4 +-
.../modules/multibindings/VertxRouteMapKeys.java | 5 +
.../testing/CassandraSidecarTestContext.java | 2 +
.../handlers/CompactionStopHandlerTest.java | 333 ++++++++++++
settings.gradle | 1 +
34 files changed, 2445 insertions(+), 35 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index d2baf2aa..c4f47450 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
0.3.0
-----
+ * Sidecar API Endpoint for Nodetool Compaction Stop (CASSSIDECAR-360)
* Implementation of CDCPublisher (CASSSIDECAR-243)
* Sidecar endpoint for moving a node to a new token (CASSSIDECAR-344)
* Returning JSON responses for live migration status endpoints in case of
errors (CASSSIDECAR-395)
diff --git
a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java
b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java
index 556c405a..d8bc9eea 100644
---
a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java
+++
b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java
@@ -81,8 +81,11 @@ public class CassandraAdapter implements ICassandraAdapter
this.tableOperations =
Objects.requireNonNull(createTableOperations(jmxClient), "tableOperations is
required");
this.compactionManagerOperations =
Objects.requireNonNull(createCompactionManagerOperations(jmxClient),
"compactionManagerOperations is required");
this.metricsOperations =
Objects.requireNonNull(createMetricsOperations(jmxClient, tableSchemaFetcher),
"metricsOperations is required");
- this.compactionStatsOperations =
Objects.requireNonNull(createCompactionStatsOperations(storageOperations,
metricsOperations,
-
compactionManagerOperations), "compactionStatsOperations is
required");
+ this.compactionStatsOperations
+ =
Objects.requireNonNull(createCompactionStatsOperations(storageOperations,
+
metricsOperations,
+
compactionManagerOperations),
+ "compactionStatsOperations is required");
}
/**
diff --git
a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraCompactionManagerOperations.java
b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraCompactionManagerOperations.java
index a2de738a..55421bce 100644
---
a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraCompactionManagerOperations.java
+++
b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraCompactionManagerOperations.java
@@ -15,11 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.cassandra.sidecar.adapters.base;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
import
org.apache.cassandra.sidecar.adapters.base.jmx.CompactionManagerJmxOperations;
import org.apache.cassandra.sidecar.common.server.CompactionManagerOperations;
@@ -32,6 +34,11 @@ import static
org.apache.cassandra.sidecar.adapters.base.jmx.CompactionManagerJm
*/
public class CassandraCompactionManagerOperations implements
CompactionManagerOperations
{
+ private static final List<String> SUPPORTED_COMPACTION_TYPES =
+ Arrays.stream(CompactionType.values())
+ .map(CompactionType::name)
+ .collect(Collectors.toList());
+
protected final JmxClient jmxClient;
/**
@@ -53,4 +60,54 @@ public class CassandraCompactionManagerOperations implements
CompactionManagerOp
return jmxClient.proxy(CompactionManagerJmxOperations.class,
COMPACTION_MANAGER_OBJ_NAME)
.getCompactions();
}
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void stopCompactionById(String compactionId)
+ {
+ // compactionId takes precedence over type if both are provided
+ if (compactionId != null && !compactionId.trim().isEmpty())
+ {
+ CompactionManagerJmxOperations proxy =
jmxClient.proxy(CompactionManagerJmxOperations.class,
+
COMPACTION_MANAGER_OBJ_NAME);
+ proxy.stopCompactionById(compactionId);
+ }
+ else
+ {
+ throw new IllegalArgumentException("compaction process with
compaction ID "
+ + compactionId + " is null or
empty");
+ }
+ }
+
+ @Override
+ public void stopCompaction(String compactionType)
+ {
+ if (compactionType != null && !compactionType.trim().isEmpty())
+ {
+ if (supportedCompactionTypes().contains(compactionType))
+ {
+ CompactionManagerJmxOperations proxy =
jmxClient.proxy(CompactionManagerJmxOperations.class,
+ COMPACTION_MANAGER_OBJ_NAME);
+ String errMsg
+ = "compaction process with compaction type " +
compactionType + " must not be null when compactionId is not provided";
+ proxy.stopCompaction(Objects.requireNonNull(compactionType,
errMsg));
+ }
+ else
+ {
+ throw new IllegalArgumentException("compaction type " +
compactionType + " is not supported");
+ }
+ }
+ else
+ {
+ throw new IllegalArgumentException("compaction type " +
compactionType + " is null or empty");
+ }
+ }
+
+ @Override
+ public List<String> supportedCompactionTypes()
+ {
+ return SUPPORTED_COMPACTION_TYPES;
+ }
}
diff --git
a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CompactionType.java
b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CompactionType.java
new file mode 100644
index 00000000..54db89c1
--- /dev/null
+++
b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CompactionType.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.sidecar.adapters.base;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.stream.Collectors;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+
+/**
+ * Supported compaction types based on Cassandra's OperationType enum
+ */
+public enum CompactionType
+{
+ CLEANUP,
+ SCRUB,
+ UPGRADE_SSTABLES,
+ VERIFY,
+ RELOCATE,
+ GARBAGE_COLLECT,
+ ANTICOMPACTION,
+ VALIDATION,
+ INDEX_BUILD,
+ VIEW_BUILD,
+ COMPACTION,
+ TOMBSTONE_COMPACTION,
+ KEY_CACHE_SAVE,
+ ROW_CACHE_SAVE,
+ COUNTER_CACHE_SAVE,
+ INDEX_SUMMARY;
+
+ private static final List<String> SUPPORTED_COMPACTION_TYPES =
+
Arrays.stream(org.apache.cassandra.sidecar.adapters.base.CompactionType.values())
+
.map(org.apache.cassandra.sidecar.adapters.base.CompactionType::name)
+ .collect(Collectors.toList());
+
+ @Override
+ public String toString()
+ {
+ return name();
+ }
+
+ /**
+ * Case-insensitive factory method for Jackson deserialization
+ * @return {@link CompactionType} from string
+ */
+ @JsonCreator
+ public static CompactionType fromString(String name)
+ {
+ if (name == null || name.trim().isEmpty())
+ {
+ return null;
+ }
+ try
+ {
+ return valueOf(name.trim().toUpperCase(Locale.ROOT));
+ }
+ catch (IllegalArgumentException unknownEnum)
+ {
+ throw new IllegalArgumentException(
+ String.format("Unsupported compactionType: '%s'. Valid
types are: %s",
+ name, SUPPORTED_COMPACTION_TYPES));
+ }
+ }
+}
diff --git
a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/CompactionManagerJmxOperations.java
b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/CompactionManagerJmxOperations.java
index a054c91d..aad573cd 100644
---
a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/CompactionManagerJmxOperations.java
+++
b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/CompactionManagerJmxOperations.java
@@ -34,4 +34,16 @@ public interface CompactionManagerJmxOperations
* @return list of compaction info maps
*/
List<Map<String, String>> getCompactions();
+
+ /**
+ * Stop compaction by type
+ * @throws IllegalArgumentException when compaction type is null
+ */
+ void stopCompaction(String type);
+
+ /**
+ * Stop compaction by ID
+ * @throws IllegalArgumentException when compaction ID is null
+ */
+ void stopCompactionById(String compactionId);
}
diff --git
a/adapters/adapters-base/src/test/java/org/apache/cassandra/sidecar/adapters/base/CassandraCompactionManagerOperationsTest.java
b/adapters/adapters-base/src/test/java/org/apache/cassandra/sidecar/adapters/base/CassandraCompactionManagerOperationsTest.java
new file mode 100644
index 00000000..ce57869d
--- /dev/null
+++
b/adapters/adapters-base/src/test/java/org/apache/cassandra/sidecar/adapters/base/CassandraCompactionManagerOperationsTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import
org.apache.cassandra.sidecar.adapters.base.jmx.CompactionManagerJmxOperations;
+import org.apache.cassandra.sidecar.common.server.JmxClient;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static
org.apache.cassandra.sidecar.adapters.base.jmx.CompactionManagerJmxOperations.COMPACTION_MANAGER_OBJ_NAME;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * Tests for {@link CassandraCompactionManagerOperations} class
+ */
+class CassandraCompactionManagerOperationsTest
+{
+ private CassandraCompactionManagerOperations compactionManagerOperations;
+ private JmxClient mockJmxClient;
+ private CompactionManagerJmxOperations mockJmxOperations;
+
+ @BeforeEach
+ void setUp()
+ {
+ mockJmxClient = mock(JmxClient.class);
+ mockJmxOperations = mock(CompactionManagerJmxOperations.class);
+ compactionManagerOperations = new
CassandraCompactionManagerOperations(mockJmxClient);
+
+ // Setup JMX proxy mock
+ when(mockJmxClient.proxy(CompactionManagerJmxOperations.class,
COMPACTION_MANAGER_OBJ_NAME))
+ .thenReturn(mockJmxOperations);
+ }
+
+ @Test
+ void testStopCompactionByIdOnly()
+ {
+ // Test stopCompactionById called when providing compactionId
+ String compactionId = "abc-123";
+ compactionManagerOperations.stopCompactionById(compactionId);
+
+ verify(mockJmxOperations, times(1)).stopCompactionById(compactionId);
+ verify(mockJmxOperations,
times(0)).stopCompaction(org.mockito.ArgumentMatchers.anyString());
+ }
+
+ @Test
+ void testStopCompactionByTypeOnly()
+ {
+ // Test stopCompaction called when no compactionId provided
+ String compactionType = "COMPACTION";
+ compactionManagerOperations.stopCompaction(compactionType);
+
+ verify(mockJmxOperations, times(1)).stopCompaction(compactionType);
+ verify(mockJmxOperations,
times(0)).stopCompactionById(org.mockito.ArgumentMatchers.anyString());
+ }
+
+ @Test
+ void testStopCompactionByIdWithWhitespace()
+ {
+ // Test trim does not result in empty string
+ String compactionId = " abc-123 ";
+ compactionManagerOperations.stopCompactionById(compactionId);
+
+ verify(mockJmxOperations, times(1)).stopCompactionById(compactionId);
+ verify(mockJmxOperations,
times(0)).stopCompaction(org.mockito.ArgumentMatchers.anyString());
+ }
+
+ @Test
+ void testStopCompactionAllSupportedTypes()
+ {
+ // Test no failures upon any supported type being provided as param
+ String[] supportedTypes = {
+ "COMPACTION", "VALIDATION", "KEY_CACHE_SAVE", "ROW_CACHE_SAVE",
+ "COUNTER_CACHE_SAVE", "CLEANUP", "SCRUB", "UPGRADE_SSTABLES",
+ "INDEX_BUILD", "TOMBSTONE_COMPACTION", "ANTICOMPACTION",
+ "VERIFY", "VIEW_BUILD", "INDEX_SUMMARY", "RELOCATE",
+ "GARBAGE_COLLECT"
+ };
+
+ for (String type : supportedTypes)
+ {
+ compactionManagerOperations.stopCompaction(type);
+ verify(mockJmxOperations, times(1)).stopCompaction(type);
+ }
+ }
+
+ @Test
+ void testStopCompactionCatchesUnsupportedType()
+ {
+ String compactionType = "MAJOR_COMPACTION";
+ assertThrows(IllegalArgumentException.class,
+ () ->
compactionManagerOperations.stopCompaction(compactionType));
+ }
+
+ @Test
+ void testStopCompactionJmxProxyCalledOnce()
+ {
+ // Test JMX proxy obtained exactly once per call
+ compactionManagerOperations.stopCompactionById("test-id");
+
+ verify(mockJmxClient, times(1))
+ .proxy(CompactionManagerJmxOperations.class,
COMPACTION_MANAGER_OBJ_NAME);
+ }
+}
diff --git a/adapters/adapters-cassandra50/build.gradle
b/adapters/adapters-cassandra50/build.gradle
new file mode 100644
index 00000000..b52a18d6
--- /dev/null
+++ b/adapters/adapters-cassandra50/build.gradle
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import java.nio.file.Paths
+
+plugins {
+ id 'java-library'
+ id 'idea'
+ id 'maven-publish'
+ id "com.github.spotbugs"
+}
+
+apply from: "$rootDir/gradle/common/publishing.gradle"
+
+sourceCompatibility = JavaVersion.VERSION_11
+
+test {
+ useJUnitPlatform()
+ maxParallelForks = Runtime.runtime.availableProcessors().intdiv(2) ?: 1
+ reports {
+ junitXml.setRequired(true)
+ def destDir = Paths.get(rootProject.rootDir.absolutePath, "build",
"test-results", "adapters-cassandra50").toFile()
+ println("Destination directory for adapters-cassandra50 tests:
${destDir}")
+ junitXml.getOutputLocation().set(destDir)
+ html.setRequired(true)
+ html.getOutputLocation().set(destDir)
+ }
+}
+
+dependencies {
+ api(project(":server-common"))
+ api(project(":adapters:adapters-base"))
+ api(project(":adapters:adapters-cassandra41"))
+
+ compileOnly('org.jetbrains:annotations:23.0.0')
+ compileOnly('com.datastax.cassandra:cassandra-driver-core:3.11.3')
+ implementation("org.slf4j:slf4j-api:${project.slf4jVersion}")
+}
+
+spotbugsTest.enabled = false
\ No newline at end of file
diff --git
a/adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/Cassandra50Adapter.java
b/adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/Cassandra50Adapter.java
new file mode 100644
index 00000000..f7ac798b
--- /dev/null
+++
b/adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/Cassandra50Adapter.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.cassandra50;
+
+import java.net.InetSocketAddress;
+
+import org.apache.cassandra.sidecar.adapters.base.CassandraAdapter;
+import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
+import org.apache.cassandra.sidecar.common.server.CompactionManagerOperations;
+import org.apache.cassandra.sidecar.common.server.ICassandraAdapter;
+import org.apache.cassandra.sidecar.common.server.JmxClient;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.common.server.dns.DnsResolver;
+import org.apache.cassandra.sidecar.common.server.utils.DriverUtils;
+import org.apache.cassandra.sidecar.db.schema.TableSchemaFetcher;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * A {@link ICassandraAdapter} implementation for Cassandra 5.0
+ */
+public class Cassandra50Adapter extends CassandraAdapter
+{
+ public Cassandra50Adapter(DnsResolver dnsResolver,
+ JmxClient jmxClient,
+ CQLSessionProvider session,
+ InetSocketAddress localNativeTransportAddress,
+ DriverUtils driverUtils,
+ TableSchemaFetcher tableSchemaFetcher)
+ {
+ super(dnsResolver, jmxClient, session, localNativeTransportAddress,
driverUtils, tableSchemaFetcher);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ @NotNull
+ protected StorageOperations createStorageOperations(DnsResolver
dnsResolver, JmxClient jmxClient)
+ {
+ return new Cassandra50StorageOperations(jmxClient, dnsResolver);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * Returns Cassandra 4.x-specific CompactionManagerOperations that
excludes unsupported types
+ */
+ @Override
+ @NotNull
+ protected CompactionManagerOperations
createCompactionManagerOperations(JmxClient jmxClient)
+ {
+ return new Cassandra50CompactionManagerOperations(jmxClient);
+ }
+}
diff --git
a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraCompactionManagerOperations.java
b/adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/Cassandra50CompactionManagerOperations.java
similarity index 52%
copy from
adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraCompactionManagerOperations.java
copy to
adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/Cassandra50CompactionManagerOperations.java
index a2de738a..a6f6094f 100644
---
a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraCompactionManagerOperations.java
+++
b/adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/Cassandra50CompactionManagerOperations.java
@@ -16,41 +16,43 @@
* limitations under the License.
*/
-package org.apache.cassandra.sidecar.adapters.base;
+package org.apache.cassandra.sidecar.adapters.cassandra50;
+import java.util.Arrays;
import java.util.List;
-import java.util.Map;
+import java.util.stream.Collectors;
-import
org.apache.cassandra.sidecar.adapters.base.jmx.CompactionManagerJmxOperations;
-import org.apache.cassandra.sidecar.common.server.CompactionManagerOperations;
+import
org.apache.cassandra.sidecar.adapters.base.CassandraCompactionManagerOperations;
import org.apache.cassandra.sidecar.common.server.JmxClient;
-import static
org.apache.cassandra.sidecar.adapters.base.jmx.CompactionManagerJmxOperations.COMPACTION_MANAGER_OBJ_NAME;
-
/**
- * Default implementation that pulls methods from the Cassandra
CompactionManager Proxy
+ * Cassandra 5.0 implementation of CompactionManagerOperations that supports
MAJOR_COMPACTION
+ * which is only available in Cassandra 5.x versions
*/
-public class CassandraCompactionManagerOperations implements
CompactionManagerOperations
+public class Cassandra50CompactionManagerOperations extends
CassandraCompactionManagerOperations
{
- protected final JmxClient jmxClient;
-
+ private static final List<String> SUPPORTED_COMPACTION_TYPES =
+
Arrays.stream(org.apache.cassandra.sidecar.adapters.cassandra50.CompactionType.values())
+
.map(org.apache.cassandra.sidecar.adapters.cassandra50.CompactionType::name)
+ .collect(Collectors.toList());
/**
* Creates a new instance with the provided {@link JmxClient}
*
* @param jmxClient the JMX client used to communicate with the Cassandra
instance
*/
- public CassandraCompactionManagerOperations(JmxClient jmxClient)
+ public Cassandra50CompactionManagerOperations(JmxClient jmxClient)
{
- this.jmxClient = jmxClient;
+ super(jmxClient);
}
/**
* {@inheritDoc}
+ *
+ * Cassandra 5.x supports MAJOR_COMPACTION operation type
*/
@Override
- public List<Map<String, String>> getCompactions()
+ public List<String> supportedCompactionTypes()
{
- return jmxClient.proxy(CompactionManagerJmxOperations.class,
COMPACTION_MANAGER_OBJ_NAME)
- .getCompactions();
+ return SUPPORTED_COMPACTION_TYPES;
}
}
diff --git
a/adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/Cassandra50Factory.java
b/adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/Cassandra50Factory.java
new file mode 100644
index 00000000..131a2e02
--- /dev/null
+++
b/adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/Cassandra50Factory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.cassandra50;
+
+import java.net.InetSocketAddress;
+import java.util.Objects;
+
+import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
+import org.apache.cassandra.sidecar.common.server.ICassandraAdapter;
+import org.apache.cassandra.sidecar.common.server.ICassandraFactory;
+import org.apache.cassandra.sidecar.common.server.JmxClient;
+import org.apache.cassandra.sidecar.common.server.MinimumVersion;
+import org.apache.cassandra.sidecar.common.server.dns.DnsResolver;
+import org.apache.cassandra.sidecar.common.server.utils.DriverUtils;
+import org.apache.cassandra.sidecar.db.schema.TableSchemaFetcher;
+
+/**
+ * Factory to produce the Cassandra 5.x adapter
+ */
+@MinimumVersion("5.0.0")
+public class Cassandra50Factory implements ICassandraFactory
+{
+ private final DnsResolver dnsResolver;
+ private final DriverUtils driverUtils;
+ private final TableSchemaFetcher tableSchemaFetcher;
+
+ public Cassandra50Factory(DnsResolver dnsResolver, DriverUtils
driverUtils, TableSchemaFetcher tableSchemaFetcher)
+ {
+ this.dnsResolver = Objects.requireNonNull(dnsResolver, "dnsResolver is
required");
+ this.driverUtils = Objects.requireNonNull(driverUtils, "driverUtils is
required");
+ this.tableSchemaFetcher = Objects.requireNonNull(tableSchemaFetcher,
"tableSchemaFetcher is required");
+ }
+
+ /**
+ * Returns a new adapter for Cassandra 5.x clusters.
+ *
+ * @param session the session to the Cassandra database
+ * @param jmxClient the JMX client to connect to the
Cassandra database
+ * @param localNativeTransportAddress the native transport address and
port of the instance
+ * @return a new adapter for the 5.x clusters
+ */
+ @Override
+ public ICassandraAdapter create(CQLSessionProvider session, JmxClient
jmxClient,
+ InetSocketAddress
localNativeTransportAddress)
+ {
+ return new Cassandra50Adapter(dnsResolver, jmxClient, session,
localNativeTransportAddress, driverUtils, tableSchemaFetcher);
+ }
+}
diff --git
a/adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/Cassandra50StorageOperations.java
b/adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/Cassandra50StorageOperations.java
new file mode 100644
index 00000000..6e029d1f
--- /dev/null
+++
b/adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/Cassandra50StorageOperations.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.cassandra50;
+
+import org.apache.cassandra.sidecar.adapters.base.RingProvider;
+import org.apache.cassandra.sidecar.adapters.base.TokenRangeReplicaProvider;
+import
org.apache.cassandra.sidecar.adapters.cassandra41.Cassandra41StorageOperations;
+import org.apache.cassandra.sidecar.common.server.JmxClient;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.common.server.dns.DnsResolver;
+
+/**
+ * An implementation of the {@link StorageOperations} that interfaces with
Cassandra 5.0 and later
+ */
+public class Cassandra50StorageOperations extends Cassandra41StorageOperations
+{
+ /**
+ * Creates a new instance with the provided {@link JmxClient} and {@link
DnsResolver}
+ *
+ * @param jmxClient the JMX client used to communicate with the
Cassandra instance
+ * @param dnsResolver the DNS resolver used to lookup replicas
+ */
+ public Cassandra50StorageOperations(JmxClient jmxClient, DnsResolver
dnsResolver)
+ {
+ super(jmxClient, dnsResolver);
+ }
+
+ /**
+ * Creates a new instances with the provided {@link JmxClient}, {@link
RingProvider}, and
+ * {@link TokenRangeReplicaProvider}. This constructor is exposed for
extensibility.
+ *
+ * @param jmxClient the JMX client used to communicate
with the Cassandra instance
+ * @param ringProvider the ring provider instance
+ * @param tokenRangeReplicaProvider the token range replica provider
+ */
+ public Cassandra50StorageOperations(JmxClient jmxClient,
+ RingProvider ringProvider,
+ TokenRangeReplicaProvider
tokenRangeReplicaProvider)
+ {
+ super(jmxClient, ringProvider, tokenRangeReplicaProvider);
+ }
+}
diff --git
a/adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/CompactionType.java
b/adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/CompactionType.java
new file mode 100644
index 00000000..202831fc
--- /dev/null
+++
b/adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/CompactionType.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.sidecar.adapters.cassandra50;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.stream.Collectors;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+
+/**
+ * Supported compaction types based on Cassandra's OperationType enum
+ */
+public enum CompactionType
+{
+ CLEANUP,
+ SCRUB,
+ UPGRADE_SSTABLES,
+ VERIFY,
+ RELOCATE,
+ GARBAGE_COLLECT,
+ ANTICOMPACTION,
+ VALIDATION,
+ INDEX_BUILD,
+ VIEW_BUILD,
+ COMPACTION,
+ TOMBSTONE_COMPACTION,
+ KEY_CACHE_SAVE,
+ ROW_CACHE_SAVE,
+ COUNTER_CACHE_SAVE,
+ INDEX_SUMMARY,
+ MAJOR_COMPACTION;
+
+ private static final List<String> SUPPORTED_COMPACTION_TYPES =
+
Arrays.stream(org.apache.cassandra.sidecar.adapters.base.CompactionType.values())
+
.map(org.apache.cassandra.sidecar.adapters.base.CompactionType::name)
+ .collect(Collectors.toList());
+
+ @Override
+ public String toString()
+ {
+ return name();
+ }
+
+ /**
+ * Case-insensitive factory method for Jackson deserialization
+ * @return {@link CompactionType} from string
+ */
+ @JsonCreator
+ public static CompactionType fromString(String name)
+ {
+ if (name == null || name.trim().isEmpty())
+ {
+ return null;
+ }
+ try
+ {
+ return valueOf(name.trim().toUpperCase(Locale.ROOT));
+ }
+ catch (IllegalArgumentException unknownEnum)
+ {
+ throw new IllegalArgumentException(
+ String.format("Unsupported compactionType: '%s'. Valid
types are: %s",
+ name, SUPPORTED_COMPACTION_TYPES));
+ }
+ }
+}
diff --git
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
index 8af3b966..8269bf04 100644
---
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
+++
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
@@ -139,6 +139,7 @@ public final class ApiEndpointsV1
public static final String CONNECTED_CLIENT_STATS_ROUTE = API_V1 +
CASSANDRA + "/stats/connected-clients";
public static final String COMPACTION_STATS_ROUTE = API_V1 + CASSANDRA +
"/stats/compaction";
+ private static final String OPERATION_ROUTE = "/operations";
private static final String OPERATIONAL_JOBS = "/operational-jobs";
private static final String PER_OPERATIONAL_JOB = OPERATIONAL_JOBS + '/' +
OPERATIONAL_JOB_ID_PATH_PARAM;
public static final String LIST_OPERATIONAL_JOBS_ROUTE = API_V1 +
CASSANDRA + OPERATIONAL_JOBS;
@@ -148,7 +149,7 @@ public final class ApiEndpointsV1
public static final String NODE_DRAIN_ROUTE = API_V1 + CASSANDRA +
"/operations/drain";
public static final String STREAM_STATS_ROUTE = API_V1 + CASSANDRA +
"/stats/streams";
public static final String TABLE_STATS_ROUTE = API_V1 + CASSANDRA +
PER_KEYSPACE + PER_TABLE + "/stats";
-
+ public static final String COMPACTION_STOP_ROUTE = API_V1 + CASSANDRA +
OPERATION_ROUTE + "/compaction/stop";
// Live Migration APIs
public static final String LIVE_MIGRATION_API_PREFIX = API_V1 +
"/live-migration";
diff --git
a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/CompactionManagerOperations.java
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/data/CompactionStopStatus.java
similarity index 68%
copy from
server-common/src/main/java/org/apache/cassandra/sidecar/common/server/CompactionManagerOperations.java
copy to
client-common/src/main/java/org/apache/cassandra/sidecar/common/data/CompactionStopStatus.java
index eddc6d7f..a2c9e3ae 100644
---
a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/CompactionManagerOperations.java
+++
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/data/CompactionStopStatus.java
@@ -15,21 +15,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-package org.apache.cassandra.sidecar.common.server;
-
-import java.util.List;
-import java.util.Map;
+package org.apache.cassandra.sidecar.common.data;
/**
- * An interface that defines the compaction manager operations exposed by
Sidecar
+ * Status values for compaction stop operations
*/
-public interface CompactionManagerOperations
+public enum CompactionStopStatus
{
/**
- * Returns active compactions as a list of compaction info maps
- *
- * @return list of compaction info maps
+ * Compaction stop request submitted to Cassandra - ongoing compactions
now stopping
+ */
+ SUBMITTED,
+
+ /**
+ * Compaction stop request failed
*/
- List<Map<String, String>> getCompactions();
+ FAILED;
}
diff --git
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/CompactionStopRequest.java
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/CompactionStopRequest.java
new file mode 100644
index 00000000..36b27224
--- /dev/null
+++
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/CompactionStopRequest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.request;
+
+import io.netty.handler.codec.http.HttpMethod;
+import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
+import
org.apache.cassandra.sidecar.common.request.data.CompactionStopRequestPayload;
+import org.apache.cassandra.sidecar.common.response.CompactionStopResponse;
+
+/**
+ * Represents a request to execute compaction stop operation
+ */
+public class CompactionStopRequest extends JsonRequest<CompactionStopResponse>
+{
+ private final CompactionStopRequestPayload payload;
+
+ /**
+ * Constructs a request to execute a compaction stop operation with a
payload
+ *
+ * @param payload the payload containing compaction type or ID to stop
+ */
+ public CompactionStopRequest(CompactionStopRequestPayload payload)
+ {
+ super(ApiEndpointsV1.COMPACTION_STOP_ROUTE);
+ this.payload = payload;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public HttpMethod method()
+ {
+ return HttpMethod.PUT;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Object requestBody()
+ {
+ return payload;
+ }
+}
diff --git
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/CompactionStopRequestPayload.java
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/CompactionStopRequestPayload.java
new file mode 100644
index 00000000..b9998a4e
--- /dev/null
+++
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/CompactionStopRequestPayload.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.request.data;
+
+import java.util.Locale;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Request payload for stopping compaction operations.
+ *
+ * <p>Valid JSON:</p>
+ * <pre>
+ * { "compactionType": "COMPACTION", "compactionId": "abc-123" }
+ * { "compactionType": "VALIDATION" }
+ * </pre>
+ */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class CompactionStopRequestPayload
+{
+ public static final String COMPACTION_TYPE_KEY = "compactionType";
+ public static final String COMPACTION_ID_KEY = "compactionId";
+
+ private final String compactionType;
+ private final String compactionId;
+
+ /**
+ * Creates a new CompactionStopRequestPayload
+ *
+ * @param compactionType the type of compaction to stop (e.g., COMPACTION,
VALIDATION, etc.)
+ * @param compactionId optional ID of a specific compaction to stop
+ */
+ @JsonCreator
+ public CompactionStopRequestPayload(@JsonProperty(value =
COMPACTION_TYPE_KEY) String compactionType,
+ @JsonProperty(value =
COMPACTION_ID_KEY) String compactionId)
+ {
+ // Normalize compactionType: trim whitespace and convert to uppercase
+ this.compactionType = normalizeCompactionType(compactionType);
+ this.compactionId = compactionId;
+ }
+
+ /**
+ * Normalizes the compaction type by trimming whitespace and converting to
uppercase.
+ * Returns null for null or empty strings.
+ *
+ * @param compactionType the raw compaction type string
+ * @return normalized compaction type or null
+ */
+ private static String normalizeCompactionType(String compactionType)
+ {
+ if (compactionType == null || compactionType.trim().isEmpty())
+ {
+ return null;
+ }
+ return compactionType.trim().toUpperCase(Locale.ROOT);
+ }
+
+ /**
+ * @return the type of compaction to stop
+ */
+ @JsonProperty(COMPACTION_TYPE_KEY)
+ public String compactionType()
+ {
+ return this.compactionType;
+ }
+
+ /**
+ * @return the ID of a specific compaction to stop, or null to stop all
specified type
+ */
+ @JsonProperty(COMPACTION_ID_KEY)
+ public String compactionId()
+ {
+ return this.compactionId != null ? this.compactionId.trim() : null;
+ }
+
+ /**
+ * Checks compaction ID valid - not null or empty post-trim
+ * */
+ public boolean hasValidCompactionId()
+ {
+ return this.compactionId != null &&
!this.compactionId.trim().isEmpty();
+ }
+
+ /**
+ * Checks compaction type not null and not empty for invalid compactionId
cases
+ * */
+ public boolean hasValidCompactionType()
+ {
+ return this.compactionType != null && !this.compactionType.isEmpty();
+ }
+
+ /**
+ * Checks at least one valid parameter provided
+ *
+ * @return true if either compaction ID or compaction type is valid, false
otherwise
+ */
+ public boolean atLeastOneParamProvided()
+ {
+ return hasValidCompactionId() || hasValidCompactionType();
+ }
+
+ @Override
+ public String toString()
+ {
+ return "CompactionStopRequestPayload{" +
+ "compactionType='" + compactionType + "'" +
+ ", compactionId='" + compactionId + "'" +
+ "}";
+ }
+}
diff --git
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/CompactionStopResponse.java
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/CompactionStopResponse.java
new file mode 100644
index 00000000..ebdbac7b
--- /dev/null
+++
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/CompactionStopResponse.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.response;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.cassandra.sidecar.common.DataObjectBuilder;
+import org.apache.cassandra.sidecar.common.data.CompactionStopStatus;
+
+/**
+ * Response class for the Compaction Stop API
+ */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class CompactionStopResponse
+{
+ public static final String COMPACTION_TYPE_KEY = "compactionType";
+ public static final String COMPACTION_ID_KEY = "compactionId";
+ public static final String STATUS_KEY = "status";
+ private final String compactionType;
+ private final String compactionId;
+ private final CompactionStopStatus status;
+
+ private CompactionStopResponse(Builder builder)
+ {
+ this.compactionType = builder.compactionType;
+ this.compactionId = builder.compactionId;
+ this.status = builder.status;
+ }
+
+ /**
+ * Constructs a new {@link CompactionStopResponse}.
+ *
+ * @param compactionType the type of compaction that was requested to stop
+ * @param compactionId the ID of the compaction that was requested to
stop
+ * @param status the status of the stop operation (e.g.,
"PENDING", "FAILED")
+ */
+ @JsonCreator
+ public CompactionStopResponse(@JsonProperty(COMPACTION_TYPE_KEY) String
compactionType,
+ @JsonProperty(COMPACTION_ID_KEY) String
compactionId,
+ @JsonProperty(STATUS_KEY)
CompactionStopStatus status)
+ {
+ this.compactionType = compactionType;
+ this.compactionId = compactionId;
+ this.status = status;
+ }
+
+ /**
+ * @return the type of compaction that was requested to stop
+ */
+ @JsonProperty(COMPACTION_TYPE_KEY)
+ public String compactionType()
+ {
+ return compactionType;
+ }
+
+ /**
+ * @return the ID of the compaction that was requested to stop
+ */
+ @JsonProperty(COMPACTION_ID_KEY)
+ public String compactionId()
+ {
+ return compactionId;
+ }
+
+ /**
+ * @return the status of the stop operation
+ */
+ @JsonProperty(STATUS_KEY)
+ public CompactionStopStatus status()
+ {
+ return status;
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format(
+ "CompactionStopResponse{compactionType='%s',
compactionId='%s', status='%s'}",
+ compactionType, compactionId, status
+ );
+ }
+
+ public static Builder builder()
+ {
+ return new Builder();
+ }
+
+ /**
+ * {@code CompactionStopResponse} builder static inner class.
+ */
+ public static final class Builder implements DataObjectBuilder<Builder,
CompactionStopResponse>
+ {
+ private String compactionType;
+ private String compactionId;
+ private CompactionStopStatus status;
+
+ private Builder()
+ {
+ }
+
+ @Override
+ public Builder self()
+ {
+ return this;
+ }
+
+ /**
+ * Sets the {@code compactionType} and returns a reference to this
Builder enabling method chaining.
+ *
+ * @param compactionType the {@code compactionType} to set
+ * @return a reference to this Builder
+ */
+ public Builder compactionType(String compactionType)
+ {
+ return update(b -> b.compactionType = compactionType);
+ }
+
+ /**
+ * Sets the {@code compactionId} and returns a reference to this
Builder enabling method chaining.
+ *
+ * @param compactionId the {@code compactionId} to set
+ * @return a reference to this Builder
+ */
+ public Builder compactionId(String compactionId)
+ {
+ return update(b -> b.compactionId = compactionId);
+ }
+
+ /**
+ * Sets the {@code status} and returns a reference to this Builder
enabling method chaining.
+ *
+ * @param status the {@code status} to set
+ * @return a reference to this Builder
+ */
+ public Builder status(CompactionStopStatus status)
+ {
+ return update(b -> b.status = status);
+ }
+
+
+ /**
+ * Returns a {@code CompactionStopResponse} built from the parameters
previously set.
+ *
+ * @return a {@code CompactionStopResponse} built with parameters of
this {@code CompactionStopResponse.Builder}
+ */
+ @Override
+ public CompactionStopResponse build()
+ {
+ return new CompactionStopResponse(this);
+ }
+ }
+}
diff --git
a/client-common/src/test/java/org/apache/cassandra/sidecar/common/request/data/CompactionStopRequestPayloadTest.java
b/client-common/src/test/java/org/apache/cassandra/sidecar/common/request/data/CompactionStopRequestPayloadTest.java
new file mode 100644
index 00000000..80e86ea3
--- /dev/null
+++
b/client-common/src/test/java/org/apache/cassandra/sidecar/common/request/data/CompactionStopRequestPayloadTest.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.request.data;
+
+import org.junit.jupiter.api.Test;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests for {@link CompactionStopRequestPayload} serialization and
deserialization
+ */
+class CompactionStopRequestPayloadTest
+{
+ private static final ObjectMapper MAPPER
+ = new
ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL);
+
+ /**
+ * All known compaction types across all Cassandra versions for testing
purposes
+ */
+ private static final String[] ALL_COMPACTION_TYPES = {
+ "CLEANUP", "SCRUB", "UPGRADE_SSTABLES", "VERIFY", "RELOCATE",
+ "GARBAGE_COLLECT", "ANTICOMPACTION", "VALIDATION", "INDEX_BUILD",
+ "VIEW_BUILD", "COMPACTION", "TOMBSTONE_COMPACTION", "KEY_CACHE_SAVE",
+ "ROW_CACHE_SAVE", "COUNTER_CACHE_SAVE", "INDEX_SUMMARY",
"MAJOR_COMPACTION"
+ };
+
+ @Test
+ void testSerDeserWithBothFields() throws JsonProcessingException
+ {
+ CompactionStopRequestPayload payload = new
CompactionStopRequestPayload("COMPACTION", "abc-123");
+ String json = MAPPER.writeValueAsString(payload);
+
assertThat(json).isEqualTo("{\"compactionType\":\"COMPACTION\",\"compactionId\":\"abc-123\"}");
+
+ CompactionStopRequestPayload deser = MAPPER.readValue(json,
CompactionStopRequestPayload.class);
+ assertThat(deser.compactionType()).isEqualTo(payload.compactionType());
+ assertThat(deser.compactionId()).isEqualTo(payload.compactionId());
+ }
+
+ @Test
+ void testSerDeserWithTypeOnly() throws JsonProcessingException
+ {
+ CompactionStopRequestPayload payload = new
CompactionStopRequestPayload("VALIDATION", null);
+ String json = MAPPER.writeValueAsString(payload);
+ assertThat(json).isEqualTo("{\"compactionType\":\"VALIDATION\"}");
+
+ CompactionStopRequestPayload deser = MAPPER.readValue(json,
CompactionStopRequestPayload.class);
+ assertThat(deser.compactionType()).isEqualTo("VALIDATION");
+ assertThat(deser.compactionId()).isNull();
+ }
+
+ @Test
+ void testSerDeserWithIdOnly() throws JsonProcessingException
+ {
+ CompactionStopRequestPayload payload = new
CompactionStopRequestPayload(null, "xyz-456");
+ String json = MAPPER.writeValueAsString(payload);
+ assertThat(json).isEqualTo("{\"compactionId\":\"xyz-456\"}");
+
+ CompactionStopRequestPayload deser = MAPPER.readValue(json,
CompactionStopRequestPayload.class);
+ assertThat(deser.compactionType()).isNull();
+ assertThat(deser.compactionId()).isEqualTo("xyz-456");
+ }
+
+ @Test
+ void testSerDeserWithBothNull() throws JsonProcessingException
+ {
+ CompactionStopRequestPayload payload = new
CompactionStopRequestPayload(null, null);
+ String json = MAPPER.writeValueAsString(payload);
+ assertThat(json).isEqualTo("{}");
+
+ CompactionStopRequestPayload deser = MAPPER.readValue(json,
CompactionStopRequestPayload.class);
+ assertThat(deser.compactionType()).isNull();
+ assertThat(deser.compactionId()).isNull();
+ }
+
+ @Test
+ void testDeserFromJsonWithBothFields() throws JsonProcessingException
+ {
+ String json =
"{\"compactionType\":\"CLEANUP\",\"compactionId\":\"test-123\"}";
+ CompactionStopRequestPayload payload = MAPPER.readValue(json,
CompactionStopRequestPayload.class);
+ assertThat(payload.compactionType()).isEqualTo("CLEANUP");
+ assertThat(payload.compactionId()).isEqualTo("test-123");
+ }
+
+ @Test
+ void testDeserFromJsonWithTypeOnly() throws JsonProcessingException
+ {
+ String json = "{\"compactionType\":\"SCRUB\"}";
+ CompactionStopRequestPayload payload = MAPPER.readValue(json,
CompactionStopRequestPayload.class);
+ assertThat(payload.compactionType()).isEqualTo("SCRUB");
+ assertThat(payload.compactionId()).isNull();
+ }
+
+ @Test
+ void testDeserFromJsonWithIdOnly() throws JsonProcessingException
+ {
+ String json = "{\"compactionId\":\"unique-compaction-id\"}";
+ CompactionStopRequestPayload payload = MAPPER.readValue(json,
CompactionStopRequestPayload.class);
+ assertThat(payload.compactionType()).isNull();
+ assertThat(payload.compactionId()).isEqualTo("unique-compaction-id");
+ }
+
+ @Test
+ void testDeserFromEmptyJson() throws JsonProcessingException
+ {
+ String json = "{}";
+ CompactionStopRequestPayload payload = MAPPER.readValue(json,
CompactionStopRequestPayload.class);
+ assertThat(payload.compactionType()).isNull();
+ assertThat(payload.compactionId()).isNull();
+ }
+
+ @Test
+ void testDeserializeWithEmptyStrings() throws JsonProcessingException
+ {
+ String json = "{\"compactionType\":\"\",\"compactionId\":\"\"}";
+ CompactionStopRequestPayload payload = MAPPER.readValue(json,
CompactionStopRequestPayload.class);
+ assertThat(payload.compactionType()).isNull();
+ assertThat(payload.compactionId()).isEmpty();
+ }
+
+ @Test
+ void testDeserializeWithWhitespace() throws JsonProcessingException
+ {
+ String json = "{\"compactionType\":\" COMPACTION
\",\"compactionId\":\" test-id \"}";
+ CompactionStopRequestPayload payload = MAPPER.readValue(json,
CompactionStopRequestPayload.class);
+ assertThat(payload.compactionType()).isEqualTo("COMPACTION");
+ assertThat(payload.compactionId()).isEqualTo("test-id");
+ }
+
+ @Test
+ void testToString()
+ {
+ CompactionStopRequestPayload payload = new
CompactionStopRequestPayload("COMPACTION", "abc-123");
+ String toString = payload.toString();
+ assertThat(toString).contains("compaction");
+ assertThat(toString).contains("abc-123");
+ assertThat(toString).contains("CompactionStopRequestPayload");
+ }
+
+ @Test
+ void testAllSupportedCompactionTypes() throws JsonProcessingException
+ {
+ // Check each compactionType field for CompactionStopRequestPayload is
serialized/deserialized correctly
+ for (String compactionType : ALL_COMPACTION_TYPES)
+ {
+ CompactionStopRequestPayload payload = new
CompactionStopRequestPayload(compactionType, null);
+ String json = MAPPER.writeValueAsString(payload);
+ assertThat(json).contains(compactionType);
+
+ CompactionStopRequestPayload deser = MAPPER.readValue(json,
CompactionStopRequestPayload.class);
+ assertThat(deser.compactionType()).isEqualTo(compactionType);
+ }
+ }
+
+ @Test
+ void testCasePreservation() throws JsonProcessingException
+ {
+ // Test preserved in serialization/deserialization
+ CompactionStopRequestPayload lowerCase = new
CompactionStopRequestPayload("COMPACTION", "Test-ID-123");
+ String json = MAPPER.writeValueAsString(lowerCase);
+ assertThat(json).contains("compaction");
+ assertThat(json).contains("Test-ID-123");
+
+ CompactionStopRequestPayload deser = MAPPER.readValue(json,
CompactionStopRequestPayload.class);
+ assertThat(deser.compactionType()).isEqualTo("COMPACTION");
+ assertThat(deser.compactionId()).isEqualTo("Test-ID-123");
+ }
+
+ @Test
+ void testHasValidCompactionIdWithBothFields() throws
JsonProcessingException
+ {
+ String json =
"{\"compactionType\":\"VALIDATION\",\"compactionId\":\"xyz-456\"}";
+ CompactionStopRequestPayload payload = MAPPER.readValue(json,
CompactionStopRequestPayload.class);
+ assertThat(payload.hasValidCompactionId()).isTrue();
+ assertThat(payload.hasValidCompactionType()).isTrue();
+ assertThat(payload.compactionId()).isEqualTo("xyz-456");
+ assertThat(payload.compactionType()).isEqualTo("VALIDATION");
+ }
+}
diff --git
a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
index c3442373..2b290efb 100644
---
a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
+++
b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
@@ -34,6 +34,7 @@ import
org.apache.cassandra.sidecar.common.request.CassandraNativeHealthRequest;
import
org.apache.cassandra.sidecar.common.request.CleanSSTableUploadSessionRequest;
import org.apache.cassandra.sidecar.common.request.ClearSnapshotRequest;
import org.apache.cassandra.sidecar.common.request.CompactionStatsRequest;
+import org.apache.cassandra.sidecar.common.request.CompactionStopRequest;
import org.apache.cassandra.sidecar.common.request.ConnectedClientStatsRequest;
import org.apache.cassandra.sidecar.common.request.CreateSnapshotRequest;
import org.apache.cassandra.sidecar.common.request.GossipHealthRequest;
@@ -62,6 +63,7 @@ import
org.apache.cassandra.sidecar.common.request.TableStatsRequest;
import org.apache.cassandra.sidecar.common.request.TimeSkewRequest;
import org.apache.cassandra.sidecar.common.request.TokenRangeReplicasRequest;
import org.apache.cassandra.sidecar.common.request.UploadSSTableRequest;
+import
org.apache.cassandra.sidecar.common.request.data.CompactionStopRequestPayload;
import org.apache.cassandra.sidecar.common.request.data.Digest;
import
org.apache.cassandra.sidecar.common.request.data.NodeCommandRequestPayload;
import org.apache.cassandra.sidecar.common.response.ListSnapshotFilesResponse;
@@ -556,6 +558,18 @@ public class RequestContext
return request(new CompactionStatsRequest());
}
+ /**
+ * Sets the {@code request} to be a {@link CompactionStopRequest} and
returns a reference to this Builder
+ * enabling method chaining.
+ *
+ * @param payload the payload containing compaction type or ID to stop
+ * @return a reference to this Builder
+ */
+ public Builder compactionStopRequest(CompactionStopRequestPayload
payload)
+ {
+ return request(new CompactionStopRequest(payload));
+ }
+
/**
* Sets the {@code request} to be a {@link TableStatsRequest} and
returns a reference to this Builder
* enabling method chaining.
diff --git
a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
index dacf4ded..15d0db85 100644
---
a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
+++
b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
@@ -55,6 +55,7 @@ import
org.apache.cassandra.sidecar.common.request.UpdateRestoreJobRequest;
import org.apache.cassandra.sidecar.common.request.UpdateServiceConfigRequest;
import
org.apache.cassandra.sidecar.common.request.data.AbortRestoreJobRequestPayload;
import
org.apache.cassandra.sidecar.common.request.data.AllServicesConfigPayload;
+import
org.apache.cassandra.sidecar.common.request.data.CompactionStopRequestPayload;
import
org.apache.cassandra.sidecar.common.request.data.CreateRestoreJobRequestPayload;
import
org.apache.cassandra.sidecar.common.request.data.CreateSliceRequestPayload;
import org.apache.cassandra.sidecar.common.request.data.Digest;
@@ -63,6 +64,7 @@ import
org.apache.cassandra.sidecar.common.request.data.RestoreJobProgressReques
import
org.apache.cassandra.sidecar.common.request.data.UpdateCdcServiceConfigPayload;
import
org.apache.cassandra.sidecar.common.request.data.UpdateRestoreJobRequestPayload;
import org.apache.cassandra.sidecar.common.response.CompactionStatsResponse;
+import org.apache.cassandra.sidecar.common.response.CompactionStopResponse;
import
org.apache.cassandra.sidecar.common.response.ConnectedClientStatsResponse;
import org.apache.cassandra.sidecar.common.response.GossipInfoResponse;
import org.apache.cassandra.sidecar.common.response.HealthResponse;
@@ -768,6 +770,22 @@ public class SidecarClient implements AutoCloseable,
SidecarClientBlobRestoreExt
.build());
}
+ /**
+ * Executes the compaction stop request using the default retry policy and
provided {@code instance}.
+ *
+ * @param instance the instance where the request will be executed
+ * @param payload the payload containing compaction type or ID to stop
+ * @return a completable future of the compaction stop response
+ */
+ public CompletableFuture<CompactionStopResponse>
compactionStop(SidecarInstance instance,
+
CompactionStopRequestPayload payload)
+ {
+ return executor.executeRequestAsync(requestBuilder()
+
.singleInstanceSelectionPolicy(instance)
+ .compactionStopRequest(payload)
+ .build());
+ }
+
/**
* Executes the table stats request using the default retry policy and
provided {@code instance}.
*
diff --git
a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
index 032a937c..08d41c74 100644
---
a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
+++
b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
@@ -63,8 +63,9 @@ import
org.apache.cassandra.sidecar.client.request.RequestExecutorTest;
import org.apache.cassandra.sidecar.client.retry.RetryAction;
import org.apache.cassandra.sidecar.client.retry.RetryPolicy;
import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
+import org.apache.cassandra.sidecar.common.data.CompactionStopStatus;
+import org.apache.cassandra.sidecar.common.data.Lifecycle;
import org.apache.cassandra.sidecar.common.data.Lifecycle.CassandraState;
-import org.apache.cassandra.sidecar.common.data.Lifecycle.OperationStatus;
import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
import org.apache.cassandra.sidecar.common.data.RestoreJobSecrets;
import org.apache.cassandra.sidecar.common.request.ImportSSTableRequest;
@@ -72,12 +73,14 @@ import
org.apache.cassandra.sidecar.common.request.NodeSettingsRequest;
import org.apache.cassandra.sidecar.common.request.Request;
import org.apache.cassandra.sidecar.common.request.Service;
import
org.apache.cassandra.sidecar.common.request.data.AllServicesConfigPayload;
+import
org.apache.cassandra.sidecar.common.request.data.CompactionStopRequestPayload;
import
org.apache.cassandra.sidecar.common.request.data.CreateRestoreJobRequestPayload;
import org.apache.cassandra.sidecar.common.request.data.MD5Digest;
import
org.apache.cassandra.sidecar.common.request.data.NodeCommandRequestPayload;
import
org.apache.cassandra.sidecar.common.request.data.UpdateCdcServiceConfigPayload;
import org.apache.cassandra.sidecar.common.request.data.XXHash32Digest;
import org.apache.cassandra.sidecar.common.response.CompactionStatsResponse;
+import org.apache.cassandra.sidecar.common.response.CompactionStopResponse;
import
org.apache.cassandra.sidecar.common.response.ConnectedClientStatsResponse;
import org.apache.cassandra.sidecar.common.response.GossipInfoResponse;
import org.apache.cassandra.sidecar.common.response.HealthResponse;
@@ -88,7 +91,6 @@ import
org.apache.cassandra.sidecar.common.response.ListCdcSegmentsResponse;
import
org.apache.cassandra.sidecar.common.response.ListOperationalJobsResponse;
import org.apache.cassandra.sidecar.common.response.ListSnapshotFilesResponse;
import org.apache.cassandra.sidecar.common.response.LiveMigrationStatus;
-import
org.apache.cassandra.sidecar.common.response.LiveMigrationStatus.MigrationState;
import org.apache.cassandra.sidecar.common.response.NodeSettings;
import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
import org.apache.cassandra.sidecar.common.response.RingResponse;
@@ -98,6 +100,7 @@ import
org.apache.cassandra.sidecar.common.response.StreamStatsResponse;
import org.apache.cassandra.sidecar.common.response.TableStatsResponse;
import org.apache.cassandra.sidecar.common.response.TimeSkewResponse;
import org.apache.cassandra.sidecar.common.response.TokenRangeReplicasResponse;
+import
org.apache.cassandra.sidecar.common.response.LiveMigrationStatus.MigrationState;
import org.apache.cassandra.sidecar.common.response.data.CdcSegmentInfo;
import org.apache.cassandra.sidecar.common.response.data.ClientConnectionEntry;
import
org.apache.cassandra.sidecar.common.response.data.CreateRestoreJobResponsePayload;
@@ -1753,6 +1756,63 @@ abstract class SidecarClientTest
}
}
+ @Test
+ void testCompactionStop() throws Exception
+ {
+ SidecarInstanceImpl instance = instances.get(0);
+ MockWebServer server = servers.get(0);
+
+ // Test stop by type
+ String responseByType =
"{\"status\":\"SUBMITTED\",\"compactionType\":\"COMPACTION\"}";
+ server.enqueue(new
MockResponse().setResponseCode(OK.code()).setBody(responseByType));
+
+ CompactionStopRequestPayload stopByType = new
CompactionStopRequestPayload("COMPACTION", null);
+ CompletableFuture<CompactionStopResponse> response1 =
client.compactionStop(instance, stopByType);
+ assertThat(response1).isNotNull();
+
+ CompactionStopResponse compactionResponse1 = response1.get();
+
+
assertThat(compactionResponse1.compactionType()).isEqualTo("COMPACTION");
+ assertThat(compactionResponse1.compactionId()).isEqualTo(null);
+
assertThat(compactionResponse1.status()).isEqualTo(CompactionStopStatus.SUBMITTED);
+
+ // Test stop by ID
+ String responseById =
"{\"status\":\"SUBMITTED\",\"compactionId\":\"test-id-1\"}";
+ server.enqueue(new
MockResponse().setResponseCode(OK.code()).setBody(responseById));
+
+ CompactionStopRequestPayload stopById = new
CompactionStopRequestPayload(null, "test-id-1");
+ CompletableFuture<CompactionStopResponse> response2 =
client.compactionStop(instance, stopById);
+ CompactionStopResponse compactionResponse2 = response2.get();
+
+ assertThat(compactionResponse2.compactionType()).isEqualTo(null);
+ assertThat(compactionResponse2.compactionId()).isEqualTo("test-id-1");
+
assertThat(compactionResponse2.status()).isEqualTo(CompactionStopStatus.SUBMITTED);
+
+ // Test id precedence when both inputs provided
+ String responseBothInputs = "{\"status\":\"SUBMITTED\"," +
+ "\"compactionId\":\"test-id-2\", " +
+ "\"compactionType\":\"VALIDATION\"}";
+ server.enqueue(new
MockResponse().setResponseCode(OK.code()).setBody(responseBothInputs));
+
+ CompactionStopRequestPayload stopAfterBothInputs
+ = new CompactionStopRequestPayload("VALIDATION", "test-id-2");
+ CompletableFuture<CompactionStopResponse> response3 =
client.compactionStop(instance, stopAfterBothInputs);
+ assertThat(response3).isNotNull();
+
+ CompactionStopResponse compactionResponse3 = response3.get();
+
assertThat(compactionResponse3.compactionType()).isEqualTo("VALIDATION");
+ assertThat(compactionResponse3.compactionId()).isEqualTo("test-id-2");
+
assertThat(compactionResponse3.status()).isEqualTo(CompactionStopStatus.SUBMITTED);
+
+ // Verify the request body contains both fields when both are provided
+ server.takeRequest(); // First request (stop by type)
+ server.takeRequest(); // Second request (stop by ID)
+ RecordedRequest thirdRequest = server.takeRequest(); // Third request
(both inputs)
+ String requestBody =
thirdRequest.getBody().readString(Charset.defaultCharset());
+ assertThat(requestBody).contains("\"compactionId\":\"test-id-2\"");
+ assertThat(requestBody).contains("\"compactionType\":\"VALIDATION\"");
+ }
+
@Test
public void testCompactionStatsServerError() throws Exception
{
@@ -2081,7 +2141,7 @@ abstract class SidecarClientTest
assertThat(result).isNotNull();
assertThat(result.currentState()).isEqualTo(CassandraState.RUNNING);
assertThat(result.desiredState()).isEqualTo(CassandraState.RUNNING);
- assertThat(result.status()).isEqualTo(OperationStatus.CONVERGED);
+
assertThat(result.status()).isEqualTo(Lifecycle.OperationStatus.CONVERGED);
assertThat(result.lastUpdate()).isEqualTo("Instance has started");
validateResponseServed(ApiEndpointsV1.LIFECYCLE_ROUTE);
@@ -2102,7 +2162,7 @@ abstract class SidecarClientTest
assertThat(result).isNotNull();
assertThat(result.currentState()).isEqualTo(CassandraState.RUNNING);
assertThat(result.desiredState()).isEqualTo(CassandraState.STOPPED);
- assertThat(result.status()).isEqualTo(OperationStatus.CONVERGING);
+
assertThat(result.status()).isEqualTo(Lifecycle.OperationStatus.CONVERGING);
assertThat(result.lastUpdate()).isEqualTo("Submitting stop task for
instance");
validateResponseServed(ApiEndpointsV1.LIFECYCLE_ROUTE, request -> {
diff --git a/integration-framework/build.gradle
b/integration-framework/build.gradle
index 5edf49ea..62248946 100644
--- a/integration-framework/build.gradle
+++ b/integration-framework/build.gradle
@@ -31,6 +31,7 @@ test {
dependencies {
implementation(project(":adapters:adapters-base"))
implementation(project(":adapters:adapters-cassandra41"))
+ implementation(project(":adapters:adapters-cassandra50"))
// Needed by the Cassandra dtest framework
// JUnit
diff --git
a/integration-framework/src/main/java/org/apache/cassandra/testing/TestUtils.java
b/integration-framework/src/main/java/org/apache/cassandra/testing/TestUtils.java
index 04aa7a8d..e0983012 100644
---
a/integration-framework/src/main/java/org/apache/cassandra/testing/TestUtils.java
+++
b/integration-framework/src/main/java/org/apache/cassandra/testing/TestUtils.java
@@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableMap;
import org.apache.cassandra.sidecar.adapters.base.CassandraFactory;
import org.apache.cassandra.sidecar.adapters.cassandra41.Cassandra41Factory;
+import org.apache.cassandra.sidecar.adapters.cassandra50.Cassandra50Factory;
import org.apache.cassandra.sidecar.common.server.dns.DnsResolver;
import org.apache.cassandra.sidecar.common.server.utils.DriverUtils;
import org.apache.cassandra.sidecar.db.schema.TableSchemaFetcher;
@@ -116,6 +117,7 @@ public final class TestUtils
return new CassandraVersionProvider.Builder()
.add(new CassandraFactory(dnsResolver, driverUtils,
tableSchemaFetcher))
.add(new Cassandra41Factory(dnsResolver, driverUtils,
tableSchemaFetcher))
+ .add(new Cassandra50Factory(dnsResolver, driverUtils,
tableSchemaFetcher))
.build();
}
diff --git
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CompactionStopIntegrationTest.java
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CompactionStopIntegrationTest.java
new file mode 100644
index 00000000..2c6e3ea4
--- /dev/null
+++
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CompactionStopIntegrationTest.java
@@ -0,0 +1,592 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+
+import org.junit.jupiter.api.Test;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.HttpResponseExpectation;
+import io.vertx.core.json.JsonObject;
+import io.vertx.ext.web.client.HttpResponse;
+
+import org.apache.cassandra.sidecar.common.data.CompactionStopStatus;
+import org.apache.cassandra.sidecar.common.response.CompactionStatsResponse;
+import org.apache.cassandra.sidecar.common.response.CompactionStopResponse;
+import org.apache.cassandra.sidecar.common.response.data.CompactionInfo;
+import org.apache.cassandra.sidecar.testing.QualifiedName;
+import
org.apache.cassandra.sidecar.testing.SharedClusterSidecarIntegrationTestBase;
+import org.apache.cassandra.testing.ClusterBuilderConfiguration;
+
+import static io.vertx.core.buffer.Buffer.buffer;
+import static org.apache.cassandra.testing.TestUtils.DC1_RF1;
+import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE;
+import static org.apache.cassandra.testing.TestUtils.TEST_TABLE_PREFIX;
+import static org.apache.cassandra.testing.utils.AssertionUtils.getBlocking;
+import static org.apache.cassandra.testing.utils.AssertionUtils.loopAssert;
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+/**
+ * Integration tests for the Compaction Stop API endpoint
+ */
+class CompactionStopIntegrationTest extends
SharedClusterSidecarIntegrationTestBase
+{
+ private static final String COMPACTION_STOP_ROUTE =
"/api/v1/cassandra/operations/compaction/stop";
+ private static final String COMPACTION_STATS_ROUTE =
"/api/v1/cassandra/stats/compaction";
+ private static final QualifiedName TEST_TABLE
+ = new QualifiedName(TEST_KEYSPACE, TEST_TABLE_PREFIX + "_compaction_test");
+ private static final List<QualifiedName> COMPACTION_TEST_TABLES = new
ArrayList<>();
+ private static final int TABLE_COUNT = 5;
+
+ @Override
+ protected ClusterBuilderConfiguration testClusterConfiguration()
+ {
+ return super.testClusterConfiguration()
+ .additionalInstanceConfig(Map.of(
+ "concurrent_compactors", 1, // Single
compactor for predictability
+ "compaction_throughput_mb_per_sec", 5, // Base
throttling at 5 MB/s
+ "auto_snapshot", "false", // Disable
auto snapshots
+ "compaction_large_partition_warning_threshold_mb", "1000",
// Avoid large partition warnings
+ "auto_compaction", "false" // Disable
ALL auto-compaction globally
+ ));
+ }
+
+ @Override
+ protected void initializeSchemaForTest()
+ {
+ createTestKeyspace(TEST_KEYSPACE, DC1_RF1);
+ createTestTable(TEST_TABLE, "CREATE TABLE %s (\n id int PRIMARY KEY,
\n data text \n);");
+ createTestKeyspace(TEST_KEYSPACE, DC1_RF1);
+
+ for (int i = 1; i <= TABLE_COUNT; i++)
+ {
+ COMPACTION_TEST_TABLES.add(new QualifiedName(TEST_KEYSPACE,
TEST_TABLE_PREFIX + "_compaction_" + i));
+ }
+
+ // Create test tables for compaction activity
+ for (QualifiedName tableName : COMPACTION_TEST_TABLES)
+ {
+ createTestTable(tableName, "CREATE TABLE %s ( \n id int PRIMARY
KEY, \n data text \n);");
+ }
+ }
+
+ @Override
+ protected void beforeTestStart()
+ {
+ // Wait for schema initialization
+ waitForSchemaReady(30, TimeUnit.SECONDS);
+
+ // Disable auto-compaction for ALL keyspaces at the beginning
+ cluster.stream().forEach(instance -> {
+ try
+ {
+ // First set compaction throughput to a high value to prevent
any initial compactions from taking too long
+ instance.nodetool("setcompactionthroughput", "100");
+
+ // Disable auto-compaction globally (no arguments)
+ instance.nodetool("disableautocompaction");
+ logger.info("Disabled auto-compaction globally");
+
+ // And for our test keyspace
+ instance.nodetool("disableautocompaction", TEST_KEYSPACE);
+
+ // Log that we've disabled auto-compaction
+ logger.info("Auto-compaction disabled for all keyspaces");
+ }
+ catch (Exception e)
+ {
+ logger.warn("Failed to disable autocompaction in
beforeTestStart: {}", e.getMessage());
+ }
+ });
+ }
+
+ @Test
+ void testStopCompactionBothParameters()
+ {
+ String payload =
"{\"compactionType\":\"VALIDATION\",\"compactionId\":\"test-id-123\"}";
+ HttpResponse<Buffer> response
+ = getBlocking(trustedClient().put(serverWrapper.serverPort,
"localhost", COMPACTION_STOP_ROUTE)
+ .sendBuffer(buffer(payload))
+
.expecting(HttpResponseExpectation.SC_OK));
+
+
assertThat(response.statusCode()).isEqualTo(HttpResponseStatus.OK.code());
+ CompactionStopResponse stopResponse =
response.bodyAsJson(CompactionStopResponse.class);
+ assertThat(stopResponse).isNotNull();
+
assertThat(stopResponse.status()).isEqualTo(CompactionStopStatus.SUBMITTED);
+ assertThat(stopResponse.compactionType()).isEqualTo("VALIDATION");
+ assertThat(stopResponse.compactionId()).isEqualTo("test-id-123");
+ }
+
+ @Test
+ void testStopCompactionMissingBothParameters()
+ {
+ String payload = "{}";
+ HttpResponse<Buffer> response
+ = getBlocking(trustedClient().put(serverWrapper.serverPort,
"localhost", COMPACTION_STOP_ROUTE)
+ .sendBuffer(buffer(payload)));
+
+
assertThat(response.statusCode()).isEqualTo(HttpResponseStatus.BAD_REQUEST.code());
+ JsonObject errorResponse = response.bodyAsJsonObject();
+ assertThat(errorResponse).isNotNull();
+ }
+
+ @Test
+ void testStopCompactionInvalidType()
+ {
+ String payload = "{\"compactionType\":\"INVALID_TYPE\"}";
+ HttpResponse<Buffer> response
+ = getBlocking(trustedClient().put(serverWrapper.serverPort,
"localhost", COMPACTION_STOP_ROUTE)
+ .sendBuffer(buffer(payload)));
+
+
assertThat(response.statusCode()).isEqualTo(HttpResponseStatus.BAD_REQUEST.code());
+ }
+
+ @Test
+ void testStopCompactionMalformedJson()
+ {
+ String payload = "{invalid json";
+ HttpResponse<Buffer> response
+ = getBlocking(trustedClient().put(serverWrapper.serverPort,
"localhost", COMPACTION_STOP_ROUTE)
+ .sendBuffer(buffer(payload)));
+
+
assertThat(response.statusCode()).isEqualTo(HttpResponseStatus.BAD_REQUEST.code());
+ }
+
+ @Test
+ void testStopCompactionAllSupportedTypes()
+ {
+ String[] supportedTypes = {
+ "COMPACTION", "VALIDATION", "KEY_CACHE_SAVE", "ROW_CACHE_SAVE",
+ "COUNTER_CACHE_SAVE", "CLEANUP", "SCRUB", "UPGRADE_SSTABLES",
+ "INDEX_BUILD", "TOMBSTONE_COMPACTION", "ANTICOMPACTION",
+ "VERIFY", "VIEW_BUILD", "INDEX_SUMMARY", "RELOCATE",
+ "GARBAGE_COLLECT", "MAJOR_COMPACTION"
+ };
+ String cassandraVersion = testVersion.version();
+
+ for (String compactionType : supportedTypes)
+ {
+ String payload = String.format("{\"compactionType\":\"%s\"}",
compactionType);
+
+ HttpResponse<Buffer> response = getBlocking(
+ trustedClient().put(serverWrapper.serverPort, "localhost",
COMPACTION_STOP_ROUTE)
+ .sendBuffer(buffer(payload))
+ );
+ if (compactionType.equals("MAJOR_COMPACTION") &&
cassandraVersion.startsWith("4."))
+ {
+
assertThat(response.statusCode()).isEqualTo(HttpResponseStatus.BAD_REQUEST.code());
+
+ }
+ else
+ {
+
assertThat(response.statusCode()).isEqualTo(HttpResponseStatus.OK.code());
+ CompactionStopResponse stopResponse =
response.bodyAsJson(CompactionStopResponse.class);
+
assertThat(stopResponse.status()).isEqualTo(CompactionStopStatus.SUBMITTED);
+
assertThat(stopResponse.compactionType()).isEqualTo(compactionType);
+ }
+ }
+ }
+
+ @Test
+ void testUnsupportedCompactionTypeForCassandraVersion()
+ {
+ String payload = "{\"compactionType\":\"MAJOR_COMPACTION\"}";
+ HttpResponse<Buffer> response = getBlocking(
+ trustedClient().put(serverWrapper.serverPort, "localhost",
COMPACTION_STOP_ROUTE)
+ .sendBuffer(buffer(payload))
+ );
+ String cassandraVersion = testVersion.version();
+
+ // Check MAJOR_COMPACTION rejected with Cassandra 4.x, accepted with
5.x
+ if (cassandraVersion.startsWith("4."))
+ {
+
assertThat(response.statusCode()).isEqualTo(HttpResponseStatus.BAD_REQUEST.code());
+ JsonObject errorResponse = response.bodyAsJsonObject();
+ assertThat(errorResponse).isNotNull();
+ // Error message could be from handler validation or JMX layer
+ String message = errorResponse.getString("message");
+ assertThat(message)
+ .satisfiesAnyOf(
+ msg -> assertThat(msg).containsIgnoringCase("not
supported"),
+ msg -> assertThat(msg).containsIgnoringCase("No enum
constant"),
+ msg -> assertThat(msg).contains("MAJOR_COMPACTION")
+ );
+ }
+ else if (cassandraVersion.startsWith("5."))
+ {
+
assertThat(response.statusCode()).isEqualTo(HttpResponseStatus.OK.code());
+ CompactionStopResponse stopResponse =
response.bodyAsJson(CompactionStopResponse.class);
+
assertThat(stopResponse.status()).isEqualTo(CompactionStopStatus.SUBMITTED);
+
assertThat(stopResponse.compactionType()).isEqualTo("MAJOR_COMPACTION");
+ }
+ else
+ {
+ // Unknown Cassandra version
+ throw new AssertionError("Unexpected Cassandra version: " +
cassandraVersion);
+ }
+ }
+
+ private void generateSSTables(QualifiedName tableName, int ssTableCount)
+ {
+ String largeData = "x".repeat(1000); // 1KB of data per row
+
+ // Double-check auto-compaction is disabled before generating data
+ cluster.stream().forEach(instance -> {
+ try
+ {
+ instance.nodetool("disableautocompaction", TEST_KEYSPACE,
tableName.table());
+ logger.info("Confirmed auto-compaction disabled for table {}
before data generation",
+ tableName.table());
+ }
+ catch (Exception e)
+ {
+ logger.warn("Failed to confirm auto-compaction is disabled:
{}", e.getMessage());
+ }
+ });
+
+ int rowsPerBatch = 500;
+
+ for (int batch = 0; batch < ssTableCount; batch++)
+ {
+ logger.info("Generating batch {} of {} for table {}", batch + 1,
ssTableCount, tableName.table());
+
+ for (int i = batch * rowsPerBatch; i < (batch + 1) * rowsPerBatch;
i++)
+ {
+ String statement = String.format("INSERT INTO %s (id, data)
VALUES (%d, '%s');",
+ tableName, i, largeData + i);
+ cluster.schemaChangeIgnoringStoppedInstances(statement);
+ }
+
+ // Flush after each batch but verify compaction is still disabled
+ final int currentBatch = batch + 1;
+ cluster.stream().forEach(instance -> {
+ try
+ {
+ // Flush only accepts one parameter (keyspace)
+ instance.flush(TEST_KEYSPACE);
+ logger.debug("Flushed keyspace {} for table {}",
TEST_KEYSPACE, tableName.table());
+ }
+ catch (Exception e)
+ {
+ logger.warn("Failed to flush: {}", e.getMessage());
+ }
+ });
+ }
+ }
+
+ /**
+ * Verifies that compactions of the specified type are no longer active
+ */
+ private void verifyCompactionStopped(String detectedCompactionId)
+ {
+ loopAssert(10, () -> {
+ HttpResponse<Buffer> statsResponse
+ = getBlocking(trustedClient().get(serverWrapper.serverPort,
"localhost", COMPACTION_STATS_ROUTE)
+ .send()
+
.expecting(HttpResponseExpectation.SC_OK));
+
+ CompactionStatsResponse stats =
statsResponse.bodyAsJson(CompactionStatsResponse.class);
+
+ // Check if compactions of this TYPE are gone from active
compactions
+ boolean compactionsOfTypeGone = stats.activeCompactions()
+ .stream()
+ .noneMatch(c ->
c.id().equals(detectedCompactionId));
+
+ logger.info("Verification: Compaction with id {} type {} are
gone={}, active count={}",
+ detectedCompactionId, "COMPACTION",
compactionsOfTypeGone,
+ stats.activeCompactionsCount());
+
+ assertThat(compactionsOfTypeGone).isTrue();
+ });
+ }
+
+ @Test
+ void testCompactionStopByTypeActuallyStopped()
+ {
+ long startTime = System.currentTimeMillis();
+
+ try
+ {
+ logger.info("Testing that compaction stop by type actually stops
compactions");
+
+ // 2. THEN set compaction throughput to slow value
+ cluster.stream().forEach(instance -> {
+ try
+ {
+ instance.nodetool("setcompactionthroughput", "1"); // 1
MB/sec rather than unlimited
+ }
+ catch (Exception e)
+ {
+ logger.warn("Failed to set compaction throughput for
stopByType: {}", e.getMessage());
+ }
+ });
+
+ // 3. THEN generate data (with reduced volume)
+ for (QualifiedName tableName : COMPACTION_TEST_TABLES)
+ {
+ generateSSTables(tableName, 20); // Reduced from 200 to 20
+ }
+
+ for (QualifiedName tableName : COMPACTION_TEST_TABLES)
+ {
+ cluster.stream().forEach(instance -> {
+ try
+ {
+ instance.nodetool("enableautocompaction",
TEST_KEYSPACE, tableName.table());
+ }
+ catch (Exception e)
+ {
+ logger.warn("Failed to re-enable autocompaction: {}",
e.getMessage());
+ }
+ });
+ }
+
+ // Add initial delay to allow compaction to start
+ logger.info("Waiting for compaction to start...");
+
+ // Poll for active compaction and stop it
+ boolean compactionStopped =
pollAndStopCompactionByType("Compaction", 30);
+
+ if (!compactionStopped)
+ {
+ logger.error("Could not catch compaction in testable state -
skipping test");
+ }
+ }
+ finally
+ {
+ long duration = System.currentTimeMillis() - startTime;
+ logger.info("Test completed in {} ms", duration);
+ }
+ }
+
+ /**
+ * Polls for an active compaction and attempts to stop it by type
+ *
+ * @param compactionType The type of compaction to look for and stop
+ * @param maxAttempts Maximum number of polling attempts
+ * @return true if a compaction was found and stopped successfully, false
otherwise
+ */
+ private boolean pollAndStopCompactionByType(String compactionType, int
maxAttempts)
+ {
+ AtomicBoolean compactionStopped = new AtomicBoolean(false);
+ AtomicReference<Double> startingProgress = new AtomicReference<>(0.0);
+ AtomicReference<String> actualCompactionType = new
AtomicReference<>(compactionType);
+
+ loopAssert(maxAttempts, () -> {
+ // Get current compaction stats
+ HttpResponse<Buffer> statsResponse
+ = getBlocking(trustedClient().get(serverWrapper.serverPort,
"localhost", COMPACTION_STATS_ROUTE)
+ .send()
+
.expecting(HttpResponseExpectation.SC_OK));
+
+ CompactionStatsResponse stats =
statsResponse.bodyAsJson(CompactionStatsResponse.class);
+
+ if (stats.activeCompactions().isEmpty())
+ {
+ logger.info("No active compactions found yet");
+ Uninterruptibles.sleepUninterruptibly(1000,
TimeUnit.MILLISECONDS);
+ throw new AssertionError("No active compactions found yet");
+ }
+
+ // Found active compaction
+ CompactionInfo compaction = stats.activeCompactions().get(0);
+ double progress = compaction.percentCompleted();
+
+ // Only proceed if compaction is in progress but not nearly
complete
+ if (progress <= 0.0 || progress >= 90.0)
+ {
+ logger.info("Compaction at {}% - waiting for suitable
progress", progress);
+ throw new AssertionError("Compaction not in suitable state to
stop");
+ }
+
+ // Found a suitable compaction to stop
+ startingProgress.set(progress);
+ String originalTaskType = compaction.taskType();
+ actualCompactionType.set(originalTaskType.toUpperCase());
+
+ logger.info("Found in-progress compaction - Type: '{}', Progress:
{}%, ID: {}",
+ actualCompactionType.get(), progress, compaction.id());
+
+ // Stop compaction by type
+ String stopPayload = "{\"compactionType\":\"" +
actualCompactionType.get() + "\"}";
+
+ HttpResponse<Buffer> stopResponse
+ = getBlocking(trustedClient().put(serverWrapper.serverPort,
"localhost", COMPACTION_STOP_ROUTE)
+ .sendBuffer(buffer(stopPayload))
+
.expecting(HttpResponseExpectation.SC_OK));
+
+
assertThat(stopResponse.statusCode()).isEqualTo(HttpResponseStatus.OK.code());
+ CompactionStopResponse response =
stopResponse.bodyAsJson(CompactionStopResponse.class);
+
assertThat(response.status()).isEqualTo(CompactionStopStatus.SUBMITTED);
+
+ logger.info("Compaction stop called successfully for type: {} at
{}% progress",
+ actualCompactionType.get(), startingProgress.get());
+
+ // Verify compaction was stopped
+ verifyCompactionStopped(compaction.id());
+
+ compactionStopped.set(true);
+ });
+
+ return compactionStopped.get();
+ }
+
+ @Test
+ void testCompactionStopByIdActuallyStopped()
+ {
+ long startTime = System.currentTimeMillis();
+
+ try
+ {
+ logger.info("Testing that compaction stop by ID actually stops
compactions");
+
+ // 2. THEN set compaction throughput to slow value
+ cluster.stream().forEach(instance -> {
+ try
+ {
+ instance.nodetool("setcompactionthroughput", "1"); // 1
MB/sec rather than unlimited
+ }
+ catch (Exception e)
+ {
+ logger.warn("Failed to set compaction throughput for
stopById: {}", e.getMessage());
+ }
+ });
+
+ // 3. THEN generate data (with reduced volume)
+ for (QualifiedName tableName : COMPACTION_TEST_TABLES)
+ {
+ generateSSTables(tableName, 20); // Reduced from 200 to 20
+ }
+
+ for (QualifiedName tableName : COMPACTION_TEST_TABLES)
+ {
+ cluster.stream().forEach(instance -> {
+ try
+ {
+ instance.nodetool("enableautocompaction",
TEST_KEYSPACE, tableName.table());
+ }
+ catch (Exception e)
+ {
+ logger.warn("Failed to re-enable autocompaction: {}",
e.getMessage());
+ }
+ });
+ }
+
+ // Add initial delay to allow compaction to start
+ logger.info("Waiting for compaction to start...");
+
+ // Poll for active compaction and stop it by ID
+ try
+ {
+ pollAndStopCompactionById(30);
+ }
+ catch (Exception e)
+ {
+ logger.warn("Could not catch compaction in testable state");
+ }
+ }
+ finally
+ {
+ long duration = System.currentTimeMillis() - startTime;
+ logger.info("Test completed in {} ms", duration);
+ }
+ }
+
+ /**
+ * Polls for an active compaction and attempts to stop it by ID
+ *
+ * @param maxAttempts Maximum number of polling attempts
+ * @return true if a compaction was found and stopped successfully, false
otherwise
+ */
+ private boolean pollAndStopCompactionById(int maxAttempts)
+ {
+ AtomicBoolean compactionStopped = new AtomicBoolean(false);
+ AtomicReference<Double> startingProgress = new AtomicReference<>(0.0);
+ AtomicReference<String> capturedCompactionId = new
AtomicReference<>("");
+
+ loopAssert(maxAttempts, () -> {
+ // Get current compaction stats
+ HttpResponse<Buffer> statsResponse
+ = getBlocking(trustedClient().get(serverWrapper.serverPort,
"localhost", COMPACTION_STATS_ROUTE)
+ .send()
+
.expecting(HttpResponseExpectation.SC_OK));
+
+ CompactionStatsResponse stats =
statsResponse.bodyAsJson(CompactionStatsResponse.class);
+
+ if (stats.activeCompactions().isEmpty())
+ {
+ logger.info("No active compactions found yet");
+ Uninterruptibles.sleepUninterruptibly(1000,
TimeUnit.MILLISECONDS);
+ throw new AssertionError("No active compactions found yet");
+ }
+
+ // Found active compaction
+ CompactionInfo compaction = stats.activeCompactions().get(0);
+ double progress = compaction.percentCompleted();
+
+ // Only proceed if compaction is in progress but not nearly
complete
+ if (progress <= 0.0 || progress >= 90.0)
+ {
+ logger.info("Compaction at {}% - waiting for suitable
progress", progress);
+ throw new AssertionError("Compaction not in suitable state to
stop");
+ }
+
+ // Found a suitable compaction to stop - capture its ID
+ startingProgress.set(progress);
+ capturedCompactionId.set(compaction.id());
+
+ logger.info("Found in-progress compaction - Type: '{}', Progress:
{}%, ID: {}",
+ compaction.taskType(), progress,
capturedCompactionId.get());
+
+ // Stop compaction by ID
+ String stopPayload = "{\"compactionId\":\"" +
capturedCompactionId.get() + "\"}";
+
+ HttpResponse<Buffer> stopResponse
+ = getBlocking(trustedClient().put(serverWrapper.serverPort,
"localhost", COMPACTION_STOP_ROUTE)
+ .sendBuffer(buffer(stopPayload))
+
.expecting(HttpResponseExpectation.SC_OK));
+
+ assertThat(stopResponse).isNotNull();
+
assertThat(stopResponse.statusCode()).isEqualTo(HttpResponseStatus.OK.code());
+ CompactionStopResponse response =
stopResponse.bodyAsJson(CompactionStopResponse.class);
+
assertThat(response.status()).isEqualTo(CompactionStopStatus.SUBMITTED);
+
assertThat(response.compactionId()).isEqualTo(capturedCompactionId.get());
+
+ logger.info("Compaction stop called successfully for ID: {} at {}%
progress",
+ capturedCompactionId.get(), startingProgress.get());
+
+ // Verify compaction was stopped
+ verifyCompactionStopped(capturedCompactionId.get());
+
+ compactionStopped.set(true);
+ });
+
+ return compactionStopped.get();
+ }
+}
diff --git
a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/CompactionManagerOperations.java
b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/CompactionManagerOperations.java
index eddc6d7f..9d9395c4 100644
---
a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/CompactionManagerOperations.java
+++
b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/CompactionManagerOperations.java
@@ -32,4 +32,24 @@ public interface CompactionManagerOperations
* @return list of compaction info maps
*/
List<Map<String, String>> getCompactions();
+
+ /**
+ * Stops compaction based on compaction ID.
+ * This method takes precedence over stopCompaction if both type and ID
are provided.
+ *
+ * @param compactionId the compaction ID to stop (nullable)
+ * @throws IllegalArgumentException if both parameters are null or empty
+ */
+ void stopCompactionById(String compactionId);
+
+ /**
+ * Stops compaction based on type if no compaction ID is provided.
+ * Checks for unupported compaction type across Cassandra versions, as set
of compactions type varies slightly
+ * between Cassandra.4x and Cassandra.5x
+ * @param compactionType the compaction ID to stop (nullable)
+ * @throws IllegalArgumentException if both parameters are null or empty,
or of the provided compactionType is unsupported
+ */
+ void stopCompaction(String compactionType);
+
+ List<String> supportedCompactionTypes();
}
diff --git a/server/build.gradle b/server/build.gradle
index a4a1418d..97cac99a 100644
--- a/server/build.gradle
+++ b/server/build.gradle
@@ -180,6 +180,7 @@ dependencies {
implementation(project(":server-common"))
implementation(project(":adapters:adapters-base"))
implementation(project(":adapters:adapters-cassandra41"))
+ implementation(project(":adapters:adapters-cassandra50"))
implementation(project(":vertx-auth-mtls"))
implementation(project(":vertx-client"))
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java
b/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java
index f25e4b34..9e9943eb 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java
@@ -80,6 +80,7 @@ public class BasicPermissions
public static final Permission READ_RING_KEYSPACE_SCOPED = new
DomainAwarePermission("RING:READ", KEYSPACE_SCOPE);
public static final Permission READ_TOPOLOGY = new
DomainAwarePermission("TOPOLOGY:READ", KEYSPACE_SCOPE);
public static final Permission MODIFY_NATIVE = new
DomainAwarePermission("NATIVE:MODIFY", CLUSTER_SCOPE);
+ public static final Permission MODIFY_COMPACTION = new
DomainAwarePermission("COMPACTION:MODIFY", CLUSTER_SCOPE);
// cassandra stats permissions
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/CompactionStopHandler.java
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/CompactionStopHandler.java
new file mode 100644
index 00000000..3c712d19
--- /dev/null
+++
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/CompactionStopHandler.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.handlers;
+
+import java.util.Collections;
+import java.util.Set;
+
+import com.google.inject.Inject;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.json.DecodeException;
+import io.vertx.core.json.Json;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.auth.authorization.Authorization;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions;
+import org.apache.cassandra.sidecar.common.data.CompactionStopStatus;
+import
org.apache.cassandra.sidecar.common.request.data.CompactionStopRequestPayload;
+import org.apache.cassandra.sidecar.common.response.CompactionStopResponse;
+import org.apache.cassandra.sidecar.common.server.CompactionManagerOperations;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.jetbrains.annotations.NotNull;
+
+import static
org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException;
+
+/**
+ * Handler for stopping compaction operations via the Cassandra Compaction
Manager.
+ *
+ * <p>Handles {@code PUT /api/v1/cassandra/operations/compaction/stop}
requests to stop
+ * compaction operations. Expects a JSON payload with compactionType and/or
compactionId:
+ * <pre>
+ * { "compactionType": "COMPACTION", "compactionId": "abc-123" }
+ * </pre>
+ */
+public class CompactionStopHandler extends
AbstractHandler<CompactionStopRequestPayload> implements AccessProtected
+{
+ /**
+ * Constructs a handler with the provided {@code metadataFetcher}
+ *
+ * @param metadataFetcher the metadata fetcher
+ * @param executorPools executor pools for blocking executions
+ * @param validator validator for Cassandra-specific input
+ */
+ @Inject
+ protected CompactionStopHandler(final InstanceMetadataFetcher
metadataFetcher,
+ final ExecutorPools executorPools,
+ final CassandraInputValidator validator)
+ {
+ super(metadataFetcher, executorPools, validator);
+ }
+
+ @Override
+ public Set<Authorization> requiredAuthorizations()
+ {
+ return
Collections.singleton(BasicPermissions.MODIFY_COMPACTION.toAuthorization());
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void handleInternal(RoutingContext context,
+ HttpServerRequest httpRequest,
+ @NotNull String host,
+ SocketAddress remoteAddress,
+ CompactionStopRequestPayload request)
+ {
+ CompactionManagerOperations compactionManagerOps =
metadataFetcher.delegate(host).compactionManagerOperations();
+
+ executorPools.service()
+ .executeBlocking(() ->
stopCompaction(compactionManagerOps, request))
+ .onSuccess(context::json)
+ .onFailure(cause -> processFailure(cause, context, host,
remoteAddress, request));
+ }
+
+ /**
+ * Stops the compaction based on the request parameters
+ *
+ * @param operations the compaction manager operations
+ * @param request the request payload containing compactionType and/or
compactionId
+ * @return CompactionStopResponse with the operation result
+ */
+ private CompactionStopResponse stopCompaction(CompactionManagerOperations
operations,
+ CompactionStopRequestPayload
request)
+ {
+ String compactionType = request.compactionType();
+ String compactionId = request.compactionId();
+
+ // Attempt to stop compaction
+ // If compactionId provided, use it (takes precedence over type)
+ if (request.hasValidCompactionId())
+ {
+ operations.stopCompactionById(compactionId);
+ }
+ else if (request.hasValidCompactionType())
+ {
+ operations.stopCompaction(compactionType);
+ }
+ // If we reach here, at least one of the above conditions was true due
to validation in extractParamsOrThrow()
+ // Return success response
+ return CompactionStopResponse.builder()
+ .compactionType(compactionType)
+ .compactionId(compactionId)
+ .status(CompactionStopStatus.SUBMITTED)
+ .build();
+ }
+
+ /**
+ * Override extractParamsOrThrow to support compactionStop param
constraints
+ * Method extracts and validates compaction stop request from routing
context
+ *
+ * @param context the routing context
+ * @return validated CompactionStopRequestPayload
+ */
+ @Override
+ protected CompactionStopRequestPayload extractParamsOrThrow(RoutingContext
context)
+ {
+ String body = context.body().asString();
+ CompactionStopRequestPayload payload;
+
+ /*
+ Return 400 BAD_REQUEST upon malformed JSON - avoids falling under
processFailure()'s vague
+ 500 INTERNAL_SERVER_ERROR. Also catches invalid compaction types
+ */
+ try
+ {
+ payload = Json.decodeValue(body,
CompactionStopRequestPayload.class);
+ }
+ catch (DecodeException e)
+ {
+ throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, "Invalid
JSON payload: " + e.getMessage());
+ }
+ catch (IllegalArgumentException e)
+ {
+ throw wrapHttpException(HttpResponseStatus.BAD_REQUEST,
e.getMessage());
+ }
+
+ // Validate that at least one field is provided
+ if (!payload.atLeastOneParamProvided())
+ {
+ throw wrapHttpException(HttpResponseStatus.BAD_REQUEST,
+ "At least one of 'compactionType' or
'compactionId' must be provided");
+ }
+
+ return payload;
+ }
+}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java
b/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java
index 8f7a2f6e..b1588b11 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java
@@ -27,6 +27,7 @@ import
org.apache.cassandra.sidecar.adapters.base.db.schema.ConnectedClientsSche
import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
import org.apache.cassandra.sidecar.common.ApiEndpointsV2;
import org.apache.cassandra.sidecar.common.response.CompactionStatsResponse;
+import org.apache.cassandra.sidecar.common.response.CompactionStopResponse;
import
org.apache.cassandra.sidecar.common.response.ConnectedClientStatsResponse;
import org.apache.cassandra.sidecar.common.response.GossipInfoResponse;
import
org.apache.cassandra.sidecar.common.response.ListOperationalJobsResponse;
@@ -39,6 +40,7 @@ import
org.apache.cassandra.sidecar.common.response.TokenRangeReplicasResponse;
import org.apache.cassandra.sidecar.common.response.v2.V2NodeSettings;
import org.apache.cassandra.sidecar.db.schema.TableSchema;
import org.apache.cassandra.sidecar.handlers.CompactionStatsHandler;
+import org.apache.cassandra.sidecar.handlers.CompactionStopHandler;
import org.apache.cassandra.sidecar.handlers.ConnectedClientStatsHandler;
import org.apache.cassandra.sidecar.handlers.GossipInfoHandler;
import org.apache.cassandra.sidecar.handlers.GossipUpdateHandler;
@@ -113,6 +115,27 @@ public class CassandraOperationsModule extends
AbstractModule
return factory.buildRouteWithHandler(compactionStatsHandler);
}
+ @PUT
+ @Path(ApiEndpointsV1.COMPACTION_STOP_ROUTE)
+ @Operation(summary = "Stop compaction operation",
+ description = "Stops a compaction operation on the Cassandra
node")
+ @APIResponse(description = "Compaction stop operation completed",
+ responseCode = "200",
+ content = @Content(mediaType = "application/json",
+ schema = @Schema(implementation =
CompactionStopResponse.class)))
+ @APIResponse(responseCode = "400",
+ description = "Invalid request - malformed JSON body or \n
invalid compaction parameters")
+ @ProvidesIntoMap
+ @KeyClassMapKey(VertxRouteMapKeys.CassandraCompactionStopRouteKey.class)
+ VertxRoute cassandraCompactionStopRoute(RouteBuilder.Factory factory,
+ CompactionStopHandler
compactionStopHandler)
+ {
+ return factory.builderForRoute()
+ .setBodyHandler(true) // IMPORTANT: needed for JSON
body parsing
+ .handler(compactionStopHandler)
+ .build();
+ }
+
@GET
@Path(ApiEndpointsV1.OPERATIONAL_JOB_ROUTE)
@Operation(summary = "Get operational job status",
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/modules/ConfigurationModule.java
b/server/src/main/java/org/apache/cassandra/sidecar/modules/ConfigurationModule.java
index ad21dd45..dae5b61d 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/modules/ConfigurationModule.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/modules/ConfigurationModule.java
@@ -36,6 +36,7 @@ import com.google.inject.name.Named;
import io.vertx.core.Vertx;
import org.apache.cassandra.sidecar.adapters.base.CassandraFactory;
import org.apache.cassandra.sidecar.adapters.cassandra41.Cassandra41Factory;
+import org.apache.cassandra.sidecar.adapters.cassandra50.Cassandra50Factory;
import org.apache.cassandra.sidecar.cluster.CQLSessionProviderImpl;
import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
import org.apache.cassandra.sidecar.cluster.InstancesMetadata;
@@ -132,7 +133,8 @@ public class ConfigurationModule extends AbstractModule
return new CassandraVersionProvider.Builder()
.add(new CassandraFactory(dnsResolver, driverUtils,
tableSchemaFetcher))
.add(new Cassandra41Factory(dnsResolver, driverUtils,
tableSchemaFetcher))
- .build();
+ .add(new Cassandra50Factory(dnsResolver, driverUtils,
tableSchemaFetcher))
+ .build();
}
@Provides
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java
b/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java
index 9875d90b..2e0ce9be 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java
@@ -54,6 +54,11 @@ public interface VertxRouteMapKeys
HttpMethod HTTP_METHOD = HttpMethod.GET;
String ROUTE_URI = ApiEndpointsV1.COMPACTION_STATS_ROUTE;
}
+ interface CassandraCompactionStopRouteKey extends RouteClassKey
+ {
+ HttpMethod HTTP_METHOD = HttpMethod.PUT;
+ String ROUTE_URI = ApiEndpointsV1.COMPACTION_STOP_ROUTE;
+ }
interface CassandraGossipHealthRouteKey extends RouteClassKey
{
HttpMethod HTTP_METHOD = HttpMethod.GET;
diff --git
a/server/src/test/integration/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java
b/server/src/test/integration/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java
index f4167d01..f502e801 100644
---
a/server/src/test/integration/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java
+++
b/server/src/test/integration/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java
@@ -37,6 +37,7 @@ import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.shared.JMXUtil;
import org.apache.cassandra.sidecar.adapters.base.CassandraFactory;
import org.apache.cassandra.sidecar.adapters.cassandra41.Cassandra41Factory;
+import org.apache.cassandra.sidecar.adapters.cassandra50.Cassandra50Factory;
import org.apache.cassandra.sidecar.cluster.CQLSessionProviderImpl;
import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
import org.apache.cassandra.sidecar.cluster.InstancesMetadata;
@@ -130,6 +131,7 @@ public class CassandraSidecarTestContext implements
AutoCloseable
return new CassandraVersionProvider.Builder()
.add(new CassandraFactory(dnsResolver, driverUtils,
tableSchemaFetcher))
.add(new Cassandra41Factory(dnsResolver, driverUtils,
tableSchemaFetcher))
+ .add(new Cassandra50Factory(dnsResolver, driverUtils,
tableSchemaFetcher))
.build();
}
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/handlers/CompactionStopHandlerTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/CompactionStopHandlerTest.java
new file mode 100644
index 00000000..523a66ba
--- /dev/null
+++
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/CompactionStopHandlerTest.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.handlers;
+
+import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import com.google.inject.util.Modules;
+
+import io.vertx.core.Vertx;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+
+import org.apache.cassandra.sidecar.TestModule;
+import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
+import org.apache.cassandra.sidecar.cluster.InstancesMetadata;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.data.CompactionStopStatus;
+import org.apache.cassandra.sidecar.common.response.CompactionStopResponse;
+import org.apache.cassandra.sidecar.common.server.CompactionManagerOperations;
+import org.apache.cassandra.sidecar.modules.SidecarModules;
+import org.apache.cassandra.sidecar.server.Server;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static io.vertx.core.buffer.Buffer.buffer;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link CompactionStopHandler} class
+ */
+@ExtendWith(VertxExtension.class)
+public class CompactionStopHandlerTest
+{
+ private static final String TEST_ROUTE =
"/api/v1/cassandra/operations/compaction/stop";
+
+ Vertx vertx;
+ Server server;
+ CompactionManagerOperations mockCompactionManagerOperations =
mock(CompactionManagerOperations.class);
+
+ @BeforeEach
+ void before() throws InterruptedException
+ {
+ Injector injector;
+ Module testOverride = Modules.override(new TestModule()).with(new
CompactionStopHandlerTestModule());
+ injector =
Guice.createInjector(Modules.override(SidecarModules.all()).with(testOverride));
+ vertx = injector.getInstance(Vertx.class);
+ server = injector.getInstance(Server.class);
+ VertxTestContext context = new VertxTestContext();
+ server.start().onSuccess(s ->
context.completeNow()).onFailure(context::failNow);
+ context.awaitCompletion(5, TimeUnit.SECONDS);
+
+ // Mock supportedCompactionTypes to return all types for testing
+
when(mockCompactionManagerOperations.supportedCompactionTypes()).thenReturn(
+ java.util.Arrays.asList("COMPACTION", "VALIDATION",
"KEY_CACHE_SAVE", "ROW_CACHE_SAVE",
+ "COUNTER_CACHE_SAVE", "CLEANUP", "SCRUB", "UPGRADE_SSTABLES",
+ "INDEX_BUILD", "TOMBSTONE_COMPACTION", "ANTICOMPACTION",
+ "VERIFY", "VIEW_BUILD", "INDEX_SUMMARY", "RELOCATE",
+ "GARBAGE_COLLECT", "MAJOR_COMPACTION"));
+ }
+
+ @AfterEach
+ void after() throws InterruptedException
+ {
+ if (server != null)
+ {
+ VertxTestContext closeContext = new VertxTestContext();
+ server.close()
+ .onComplete(res -> closeContext.completeNow());
+ closeContext.awaitCompletion(60, TimeUnit.SECONDS);
+ }
+ }
+
+ @Test
+ void testStopCompactionByTypeHappyPath(VertxTestContext ctx)
+ {
+ WebClient client = WebClient.create(vertx);
+ String payload = "{\"compactionType\":\"COMPACTION\"}";
+ client.put(server.actualPort(), "127.0.0.1", TEST_ROUTE)
+ .sendBuffer(buffer(payload), ctx.succeeding(resp -> {
+ ctx.verify(() -> {
+ verify(mockCompactionManagerOperations,
times(1)).stopCompaction(eq("COMPACTION"));
+
+ assertThat(resp.statusCode()).isEqualTo(OK.code());
+ CompactionStopResponse response =
resp.bodyAsJson(CompactionStopResponse.class);
+
assertThat(response.status()).isEqualTo(CompactionStopStatus.SUBMITTED);
+
assertThat(response.compactionType()).isEqualTo("COMPACTION");
+ });
+ ctx.completeNow();
+ }));
+ }
+
+ @Test
+ void testStopCompactionByIdHappyPath(VertxTestContext ctx)
+ {
+ WebClient client = WebClient.create(vertx);
+ String payload = "{\"compactionId\":\"abc-123\"}";
+ client.put(server.actualPort(), "127.0.0.1", TEST_ROUTE)
+ .sendBuffer(buffer(payload), ctx.succeeding(resp -> {
+ ctx.verify(() -> {
+ verify(mockCompactionManagerOperations,
times(1)).stopCompactionById(eq("abc-123"));
+
+ assertThat(resp.statusCode()).isEqualTo(OK.code());
+ CompactionStopResponse response =
resp.bodyAsJson(CompactionStopResponse.class);
+
assertThat(response.status()).isEqualTo(CompactionStopStatus.SUBMITTED);
+ assertThat(response.compactionId()).isEqualTo("abc-123");
+ });
+ ctx.completeNow();
+ }));
+ }
+
+ @Test
+ void testStopCompactionByBothFieldsHappyPath(VertxTestContext ctx)
+ {
+ WebClient client = WebClient.create(vertx);
+ String payload =
"{\"compactionType\":\"VALIDATION\",\"compactionId\":\"xyz-456\"}";
+ client.put(server.actualPort(), "127.0.0.1", TEST_ROUTE)
+ .sendBuffer(buffer(payload), ctx.succeeding(resp -> {
+ ctx.verify(() -> {
+ verify(mockCompactionManagerOperations, times(1))
+ .stopCompactionById(eq("xyz-456"));
+
+ assertThat(resp.statusCode()).isEqualTo(OK.code());
+ CompactionStopResponse response =
resp.bodyAsJson(CompactionStopResponse.class);
+
assertThat(response.status()).isEqualTo(CompactionStopStatus.SUBMITTED);
+
assertThat(response.compactionType()).isEqualTo("VALIDATION");
+ assertThat(response.compactionId()).isEqualTo("xyz-456");
+ });
+ ctx.completeNow();
+ }));
+ }
+
+ @Test
+ void testMissingBothFields(VertxTestContext ctx)
+ {
+ WebClient client = WebClient.create(vertx);
+ String payload = "{}";
+ client.put(server.actualPort(), "127.0.0.1", TEST_ROUTE)
+ .sendBuffer(buffer(payload), ctx.succeeding(resp -> {
+ ctx.verify(() -> {
+
assertThat(resp.statusCode()).isEqualTo(BAD_REQUEST.code());
+ verify(mockCompactionManagerOperations, times(0))
+ .stopCompaction(anyString());
+ });
+ ctx.completeNow();
+ }));
+ }
+
+ @Test
+ void testBothFieldsEmpty(VertxTestContext ctx)
+ {
+ WebClient client = WebClient.create(vertx);
+ String payload = "{\"compactionType\":\"\",\"compactionId\":\"\"}";
+ client.put(server.actualPort(), "127.0.0.1", TEST_ROUTE)
+ .sendBuffer(buffer(payload), ctx.succeeding(resp -> {
+ ctx.verify(() -> {
+
assertThat(resp.statusCode()).isEqualTo(BAD_REQUEST.code());
+ verify(mockCompactionManagerOperations, times(0))
+ .stopCompaction(anyString());
+ });
+ ctx.completeNow();
+ }));
+ }
+
+ @Test
+ void testInvalidCompactionType(VertxTestContext ctx)
+ {
+ // Configure mock to throw exception for invalid compaction type
+ doThrow(new IllegalArgumentException("compaction type INVALID_TYPE is
not supported"))
+
.when(mockCompactionManagerOperations).stopCompaction("INVALID_TYPE");
+
+ WebClient client = WebClient.create(vertx);
+ String payload = "{\"compactionType\":\"INVALID_TYPE\"}";
+ client.put(server.actualPort(), "127.0.0.1", TEST_ROUTE)
+ .sendBuffer(buffer(payload), ctx.succeeding(resp -> {
+ ctx.verify(() -> {
+
assertThat(resp.statusCode()).isEqualTo(BAD_REQUEST.code());
+ verify(mockCompactionManagerOperations, times(1))
+ .stopCompaction(eq("INVALID_TYPE"));
+ });
+ ctx.completeNow();
+ }));
+ }
+
+ @Test
+ void testMalformedJson(VertxTestContext ctx)
+ {
+ WebClient client = WebClient.create(vertx);
+ String payload = "{invalid json";
+ client.put(server.actualPort(), "127.0.0.1", TEST_ROUTE)
+ .sendBuffer(buffer(payload), ctx.succeeding(resp -> {
+ ctx.verify(() -> {
+
assertThat(resp.statusCode()).isEqualTo(BAD_REQUEST.code());
+ verify(mockCompactionManagerOperations, times(0))
+ .stopCompaction(anyString());
+ });
+ ctx.completeNow();
+ }));
+ }
+
+ @Test
+ void testTrimWhitespace(VertxTestContext ctx)
+ {
+ WebClient client = WebClient.create(vertx);
+ String payload = "{\"compactionType\":\" COMPACTION \"}";
+ client.put(server.actualPort(), "127.0.0.1", TEST_ROUTE)
+ .sendBuffer(buffer(payload), ctx.succeeding(resp -> {
+ ctx.verify(() -> {
+ // Should trim and pass as uppercase enum name
+ verify(mockCompactionManagerOperations, times(1))
+ .stopCompaction(eq("COMPACTION"));
+
+ assertThat(resp.statusCode()).isEqualTo(OK.code());
+ });
+ ctx.completeNow();
+ }));
+ }
+
+ @Test
+ void testCaseInsensitiveCompactionType(VertxTestContext ctx)
+ {
+ WebClient client = WebClient.create(vertx);
+ String payload = "{\"compactionType\":\"compaction\"}";
+ client.put(server.actualPort(), "127.0.0.1", TEST_ROUTE)
+ .sendBuffer(buffer(payload), ctx.succeeding(resp -> {
+ ctx.verify(() -> {
+ // Should accept lowercase input and send uppercase to
Cassandra
+ verify(mockCompactionManagerOperations, times(1))
+ .stopCompaction(eq("COMPACTION"));
+
+ assertThat(resp.statusCode()).isEqualTo(OK.code());
+ });
+ ctx.completeNow();
+ }));
+ }
+
+ @Test
+ void testAllSupportedCompactionTypes(VertxTestContext ctx) throws
InterruptedException
+ {
+ String[] supportedTypes = {
+ "COMPACTION", "VALIDATION", "KEY_CACHE_SAVE", "ROW_CACHE_SAVE",
+ "COUNTER_CACHE_SAVE", "CLEANUP", "SCRUB", "UPGRADE_SSTABLES",
+ "INDEX_BUILD", "TOMBSTONE_COMPACTION", "ANTICOMPACTION",
+ "VERIFY", "VIEW_BUILD", "INDEX_SUMMARY", "RELOCATE",
+ "GARBAGE_COLLECT", "MAJOR_COMPACTION"
+ };
+
+ WebClient client = WebClient.create(vertx);
+
+ CountDownLatch expectedCalls = new
CountDownLatch(supportedTypes.length);
+ for (String type : supportedTypes)
+ {
+ String payload = "{\"compactionType\":\"" + type + "\"}";
+ client.put(server.actualPort(), "127.0.0.1", TEST_ROUTE)
+ .sendBuffer(buffer(payload), ctx.succeeding(resp -> {
+ ctx.verify(() -> {
+ assertThat(resp.statusCode()).isEqualTo(OK.code());
+ CompactionStopResponse response =
resp.bodyAsJson(CompactionStopResponse.class);
+
assertThat(response.status()).isEqualTo(CompactionStopStatus.SUBMITTED);
+ expectedCalls.countDown();
+ });
+ }));
+ }
+ expectedCalls.await(30, TimeUnit.SECONDS);
+ ctx.completeNow();
+ }
+
+ /**
+ * Test guice module for {@link CompactionStopHandler} tests
+ */
+ class CompactionStopHandlerTestModule extends AbstractModule
+ {
+ @Provides
+ @Singleton
+ public InstancesMetadata instanceMetadata()
+ {
+ final int instanceId = 100;
+ final String host = "127.0.0.1";
+ final InstanceMetadata instanceMetadata =
mock(InstanceMetadata.class);
+ when(instanceMetadata.host()).thenReturn(host);
+ when(instanceMetadata.port()).thenReturn(9042);
+ when(instanceMetadata.id()).thenReturn(instanceId);
+ when(instanceMetadata.stagingDir()).thenReturn("");
+
+ CassandraAdapterDelegate delegate =
mock(CassandraAdapterDelegate.class);
+
when(delegate.compactionManagerOperations()).thenReturn(mockCompactionManagerOperations);
+ when(instanceMetadata.delegate()).thenReturn(delegate);
+
+ InstancesMetadata mockInstancesMetadata =
mock(InstancesMetadata.class);
+
when(mockInstancesMetadata.instances()).thenReturn(Collections.singletonList(instanceMetadata));
+
when(mockInstancesMetadata.instanceFromId(instanceId)).thenReturn(instanceMetadata);
+
when(mockInstancesMetadata.instanceFromHost(host)).thenReturn(instanceMetadata);
+
+ return mockInstancesMetadata;
+ }
+ }
+}
diff --git a/settings.gradle b/settings.gradle
index 5d91edb0..cfb61678 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -29,6 +29,7 @@ include "vertx-client-shaded"
if (JavaVersion.current().isJava11Compatible()) {
include "adapters:adapters-base"
include "adapters:adapters-cassandra41"
+ include "adapters:adapters-cassandra50"
include "docs"
include "server"
include "server-common"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]