This is an automated email from the ASF dual-hosted git repository.
pauloricardomg pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push:
new 64bae6bb CASSSIDECAR-373: Define storage provider interface and
Cassandra implementation to support durable job tracking (#339)
64bae6bb is described below
commit 64bae6bb7e2467b0f61cf88e7f7ed7baf294c78f
Author: Andrés Beck-Ruiz <[email protected]>
AuthorDate: Wed Jun 10 15:31:53 2026 -0400
CASSSIDECAR-373: Define storage provider interface and Cassandra
implementation to support durable job tracking (#339)
Patch by Andrés Beck-Ruiz; Reviewed by Isaac Reath, Paulo Motta, Saranya
Krishnakuma for CASSSIDECAR-373
---
CHANGES.txt | 1 +
.../sidecar/common/data/OperationType.java | 30 +++
.../config/OperationalJobConfiguration.java | 32 +++
.../sidecar/config/SidecarConfiguration.java | 5 +
.../yaml/OperationalJobConfigurationImpl.java | 95 +++++++++
.../config/yaml/SidecarConfigurationImpl.java | 17 ++
.../db/ActiveClusterOpsDatabaseAccessor.java | 103 +++++++++
.../sidecar/db/ClusterOpsDatabaseAccessor.java | 177 ++++++++++++++++
.../db/ClusterOpsNodeStateDatabaseAccessor.java | 120 +++++++++++
.../sidecar/db/schema/ActiveClusterOpsSchema.java | 139 +++++++++++++
.../db/schema/ClusterOpsNodeStateSchema.java | 132 ++++++++++++
.../sidecar/db/schema/ClusterOpsSchema.java | 198 ++++++++++++++++++
.../job/storage/CassandraStorageProvider.java | 230 +++++++++++++++++++++
.../sidecar/job/storage/OperationalJobRecord.java | 191 +++++++++++++++++
.../sidecar/job/storage/StorageProvider.java | 204 ++++++++++++++++++
.../job/storage/StorageProviderException.java | 52 +++++
.../sidecar/modules/OperationalJobsModule.java | 79 +++++++
.../cassandra/sidecar/modules/SidecarModules.java | 1 +
.../modules/multibindings/TableSchemaMapKeys.java | 3 +
...eClusterOpsDatabaseAccessorIntegrationTest.java | 111 ++++++++++
.../ClusterOpsDatabaseAccessorIntegrationTest.java | 160 ++++++++++++++
...psNodeStateDatabaseAccessorIntegrationTest.java | 121 +++++++++++
.../yaml/OperationalJobConfigurationImplTest.java | 49 +++++
.../cassandra/sidecar/db/SidecarSchemaTest.java | 32 ++-
24 files changed, 2280 insertions(+), 2 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 7bb99b4e..a26611c5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
0.5.0
-----
+ * Define storage provider interface and Cassandra implementation to support
durable job tracking (CASSSIDECAR-373)
* Validate snapshot name during list snapshot (CASSSIDECAR-461)
0.4.0
diff --git
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/data/OperationType.java
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/data/OperationType.java
new file mode 100644
index 00000000..479fbc7b
--- /dev/null
+++
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/data/OperationType.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.data;
+
+/**
+ * Enumerates the types of operational jobs that can be performed on a
Cassandra cluster.
+ */
+public enum OperationType
+{
+ DECOMMISSION,
+ DRAIN,
+ MOVE,
+ REPAIR
+}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/config/OperationalJobConfiguration.java
b/server/src/main/java/org/apache/cassandra/sidecar/config/OperationalJobConfiguration.java
new file mode 100644
index 00000000..9c20d5ab
--- /dev/null
+++
b/server/src/main/java/org/apache/cassandra/sidecar/config/OperationalJobConfiguration.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.config;
+
+import
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
+
+/**
+ * Configuration for operational jobs managed by Sidecar
+ */
+public interface OperationalJobConfiguration
+{
+ /**
+ * @return the time-to-live for operational job tables
+ */
+ SecondBoundConfiguration tablesTtl();
+}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/config/SidecarConfiguration.java
b/server/src/main/java/org/apache/cassandra/sidecar/config/SidecarConfiguration.java
index 82f6cda3..8a9fab23 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/config/SidecarConfiguration.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/config/SidecarConfiguration.java
@@ -120,4 +120,9 @@ public interface SidecarConfiguration
* @return the configuration for lifecycle management
*/
LifecycleConfiguration lifecycleConfiguration();
+
+ /**
+ * @return the configuration for operational jobs
+ */
+ OperationalJobConfiguration operationalJobConfiguration();
}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/OperationalJobConfigurationImpl.java
b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/OperationalJobConfigurationImpl.java
new file mode 100644
index 00000000..8cc1ca42
--- /dev/null
+++
b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/OperationalJobConfigurationImpl.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.config.yaml;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.cassandra.sidecar.common.DataObjectBuilder;
+import
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
+import org.apache.cassandra.sidecar.config.OperationalJobConfiguration;
+
+/**
+ * Configuration for operational jobs managed by Sidecar
+ */
+public class OperationalJobConfigurationImpl implements
OperationalJobConfiguration
+{
+ private static final SecondBoundConfiguration DEFAULT_TABLES_TTL =
SecondBoundConfiguration.parse("90d");
+ private static final SecondBoundConfiguration MIN_TABLES_TTL =
SecondBoundConfiguration.parse("14d");
+
+ protected SecondBoundConfiguration tablesTtl;
+
+ public OperationalJobConfigurationImpl()
+ {
+ this(builder());
+ }
+
+ protected OperationalJobConfigurationImpl(Builder builder)
+ {
+ this.tablesTtl = builder.tablesTtl;
+ validate();
+ }
+
+ private void validate()
+ {
+ if (tablesTtl.compareTo(MIN_TABLES_TTL) < 0)
+ {
+ throw new IllegalArgumentException("tablesTtl cannot be less than
" + MIN_TABLES_TTL);
+ }
+ }
+
+ @Override
+ @JsonProperty(value = "tables_ttl")
+ public SecondBoundConfiguration tablesTtl()
+ {
+ return tablesTtl;
+ }
+
+ public static Builder builder()
+ {
+ return new Builder();
+ }
+
+ /**
+ * {@code OperationalJobConfigurationImpl} builder static inner class.
+ */
+ public static class Builder implements DataObjectBuilder<Builder,
OperationalJobConfigurationImpl>
+ {
+ private SecondBoundConfiguration tablesTtl = DEFAULT_TABLES_TTL;
+
+ protected Builder()
+ {
+ }
+
+ @Override
+ public Builder self()
+ {
+ return this;
+ }
+
+ public Builder tablesTtl(SecondBoundConfiguration tablesTtl)
+ {
+ return update(b -> b.tablesTtl = tablesTtl);
+ }
+
+ @Override
+ public OperationalJobConfigurationImpl build()
+ {
+ return new OperationalJobConfigurationImpl(this);
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/SidecarConfigurationImpl.java
b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/SidecarConfigurationImpl.java
index ef364cf1..58781bd4 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/SidecarConfigurationImpl.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/SidecarConfigurationImpl.java
@@ -45,6 +45,7 @@ import
org.apache.cassandra.sidecar.config.InstanceConfiguration;
import org.apache.cassandra.sidecar.config.LifecycleConfiguration;
import org.apache.cassandra.sidecar.config.LiveMigrationConfiguration;
import org.apache.cassandra.sidecar.config.MetricsConfiguration;
+import org.apache.cassandra.sidecar.config.OperationalJobConfiguration;
import org.apache.cassandra.sidecar.config.PeriodicTaskConfiguration;
import org.apache.cassandra.sidecar.config.RestoreJobConfiguration;
import org.apache.cassandra.sidecar.config.S3ClientConfiguration;
@@ -122,6 +123,9 @@ public class SidecarConfigurationImpl implements
SidecarConfiguration
@JsonProperty("lifecycle")
private LifecycleConfiguration lifecycleConfiguration;
+ @JsonProperty("operational_job")
+ private OperationalJobConfiguration operationalJobConfiguration;
+
public SidecarConfigurationImpl()
{
this(builder());
@@ -147,6 +151,7 @@ public class SidecarConfigurationImpl implements
SidecarConfiguration
schemaReportingConfiguration = builder.schemaReportingConfiguration;
liveMigrationConfiguration = builder.liveMigrationConfiguration;
lifecycleConfiguration = builder.lifecycleConfiguration;
+ operationalJobConfiguration = builder.operationalJobConfiguration;
}
/**
@@ -327,6 +332,12 @@ public class SidecarConfigurationImpl implements
SidecarConfiguration
return lifecycleConfiguration;
}
+ @Override
+ @JsonProperty("operational_job")
+ public OperationalJobConfiguration operationalJobConfiguration()
+ {
+ return operationalJobConfiguration;
+ }
public static SidecarConfigurationImpl readYamlConfiguration(Path
yamlConfigurationPath) throws IOException
{
@@ -444,6 +455,7 @@ public class SidecarConfigurationImpl implements
SidecarConfiguration
private SchemaReportingConfiguration schemaReportingConfiguration =
new SchemaReportingConfigurationImpl();
private LiveMigrationConfiguration liveMigrationConfiguration = new
LiveMigrationConfigurationImpl();
private LifecycleConfiguration lifecycleConfiguration = new
LifecycleConfigurationImpl();
+ private OperationalJobConfiguration operationalJobConfiguration = new
OperationalJobConfigurationImpl();
protected Builder()
{
@@ -659,6 +671,11 @@ public class SidecarConfigurationImpl implements
SidecarConfiguration
return update(b -> b.lifecycleConfiguration =
lifecycleConfiguration);
}
+ public Builder operationalJobConfiguration(OperationalJobConfiguration
operationalJobConfiguration)
+ {
+ return update(b -> b.operationalJobConfiguration =
operationalJobConfiguration);
+ }
+
/**
* Returns a {@code SidecarConfigurationImpl} built from the
parameters previously set.
*
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/db/ActiveClusterOpsDatabaseAccessor.java
b/server/src/main/java/org/apache/cassandra/sidecar/db/ActiveClusterOpsDatabaseAccessor.java
new file mode 100644
index 00000000..0c48138d
--- /dev/null
+++
b/server/src/main/java/org/apache/cassandra/sidecar/db/ActiveClusterOpsDatabaseAccessor.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.db;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.cassandra.sidecar.common.data.OperationType;
+import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
+import org.apache.cassandra.sidecar.db.schema.ActiveClusterOpsSchema;
+import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.VisibleForTesting;
+
+/**
+ * Database accessor for the {@code active_cluster_ops} table.
+ * Provides mutual exclusion for active operations via lightweight
transactions (LWT).
+ */
+@Singleton
+public class ActiveClusterOpsDatabaseAccessor extends
DatabaseAccessor<ActiveClusterOpsSchema>
+{
+ @Inject
+ public ActiveClusterOpsDatabaseAccessor(SidecarSchema sidecarSchema,
CQLSessionProvider sessionProvider)
+ {
+ this(sidecarSchema.tableSchema(ActiveClusterOpsSchema.class),
sessionProvider);
+ }
+
+ @VisibleForTesting
+ public ActiveClusterOpsDatabaseAccessor(ActiveClusterOpsSchema schema,
CQLSessionProvider sessionProvider)
+ {
+ super(schema, sessionProvider);
+ }
+
+ public boolean trySetActiveOperation(String clusterName, String datacenter,
+ OperationType operationType, UUID
operationId)
+ {
+ BoundStatement statement = tableSchema.trySetActive()
+ .bind(clusterName, datacenter,
operationType.name(), operationId);
+ statement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
+ statement.setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL);
+ ResultSet resultSet = execute(statement);
+ return resultSet.wasApplied();
+ }
+
+ @Nullable
+ public UUID getActiveOperation(String clusterName, String datacenter,
OperationType operationType)
+ {
+ BoundStatement statement =
tableSchema.getActiveByType().bind(clusterName, datacenter,
operationType.name());
+ statement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
+ ResultSet resultSet = execute(statement);
+ Row row = resultSet.one();
+ return row == null ? null : row.getUUID("operation_id");
+ }
+
+ @NotNull
+ public Map<OperationType, UUID> getActiveOperations(String clusterName,
String datacenter)
+ {
+ BoundStatement statement = tableSchema.getActive().bind(clusterName,
datacenter);
+ statement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
+ ResultSet resultSet = execute(statement);
+ Map<OperationType, UUID> activeOps = new HashMap<>();
+ for (Row row : resultSet)
+ {
+
activeOps.put(OperationType.valueOf(row.getString("operation_type")),
row.getUUID("operation_id"));
+ }
+ return activeOps;
+ }
+
+ public boolean clearActiveOperation(String clusterName, String datacenter,
+ OperationType operationType, UUID
operationId)
+ {
+ BoundStatement statement = tableSchema.clearActive()
+ .bind(clusterName, datacenter,
operationType.name(), operationId);
+ statement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
+ statement.setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL);
+ ResultSet resultSet = execute(statement);
+ return resultSet.wasApplied();
+ }
+}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/db/ClusterOpsDatabaseAccessor.java
b/server/src/main/java/org/apache/cassandra/sidecar/db/ClusterOpsDatabaseAccessor.java
new file mode 100644
index 00000000..eea6f769
--- /dev/null
+++
b/server/src/main/java/org/apache/cassandra/sidecar/db/ClusterOpsDatabaseAccessor.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.db;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import com.google.common.reflect.TypeToken;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.cassandra.sidecar.common.data.OperationType;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
+import org.apache.cassandra.sidecar.db.schema.ClusterOpsSchema;
+import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
+import org.apache.cassandra.sidecar.job.storage.OperationalJobRecord;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.VisibleForTesting;
+
+/**
+ * Database accessor for the {@code cluster_ops} table.
+ */
+@Singleton
+public class ClusterOpsDatabaseAccessor extends
DatabaseAccessor<ClusterOpsSchema>
+{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ClusterOpsDatabaseAccessor.class);
+ private static final TypeToken<List<List<UUID>>> NODES_ORDER_TYPE = new
TypeToken<List<List<UUID>>>() {};
+
+ @Inject
+ public ClusterOpsDatabaseAccessor(SidecarSchema sidecarSchema,
CQLSessionProvider sessionProvider)
+ {
+ this(sidecarSchema.tableSchema(ClusterOpsSchema.class),
sessionProvider);
+ }
+
+ @VisibleForTesting
+ public ClusterOpsDatabaseAccessor(ClusterOpsSchema schema,
CQLSessionProvider sessionProvider)
+ {
+ super(schema, sessionProvider);
+ }
+
+ public void persistJob(String clusterName, OperationalJobRecord job)
+ {
+ Date lastUpdate = Date.from(Instant.now());
+ Date startTime = job.startTime() != null ? Date.from(job.startTime())
: null;
+ BoundStatement statement = tableSchema.insertJob()
+ .bind(clusterName,
+ job.jobId(),
+ job.operationType().name(),
+ job.status().name(),
+ startTime,
+ lastUpdate,
+ job.failureReason(),
+ job.nodeExecutionOrder(),
+ job.operationMetadata());
+ statement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
+ execute(statement);
+ }
+
+ @Nullable
+ public OperationalJobRecord findJob(String clusterName, UUID jobId)
+ {
+ BoundStatement statement = tableSchema.selectJob().bind(clusterName,
jobId);
+ statement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
+ ResultSet resultSet = execute(statement);
+ Row row = resultSet.one();
+ if (row == null)
+ {
+ return null;
+ }
+ if (!resultSet.isExhausted())
+ {
+ LOGGER.warn("Multiple rows found for operation_id={} in
cluster={}. Using first match.", jobId, clusterName);
+ }
+ return recordFromRow(row);
+ }
+
+ public void updateJobStatus(String clusterName, UUID jobId, OperationType
operationType,
+ OperationalJobStatus status, @Nullable String
failureReason)
+ {
+ Date lastUpdate = Date.from(Instant.now());
+ BoundStatement statement;
+ if (failureReason != null)
+ {
+ statement = tableSchema.updateStatusWithFailure()
+ .bind(status.name(), lastUpdate,
failureReason,
+ clusterName, jobId,
operationType.name());
+ }
+ else if (status == OperationalJobStatus.RUNNING &&
shouldSetStartTime(clusterName, jobId))
+ {
+ statement = tableSchema.updateStatusWithStartTime()
+ .bind(status.name(), lastUpdate, lastUpdate,
+ clusterName, jobId,
operationType.name());
+ }
+ else
+ {
+ statement = tableSchema.updateStatus()
+ .bind(status.name(), lastUpdate,
+ clusterName, jobId,
operationType.name());
+ }
+ statement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
+ execute(statement);
+ }
+
+ private boolean shouldSetStartTime(String clusterName, UUID jobId)
+ {
+ OperationalJobRecord existing = findJob(clusterName, jobId);
+ return existing != null && existing.startTime() == null;
+ }
+
+ @NotNull
+ public List<OperationalJobRecord> findAllJobs(String clusterName, int
limit)
+ {
+ BoundStatement statement = tableSchema.findAllJobs().bind(clusterName,
limit);
+ statement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
+ ResultSet resultSet = execute(statement);
+ List<OperationalJobRecord> records = new ArrayList<>();
+ for (Row row : resultSet)
+ {
+ records.add(recordFromRow(row));
+ }
+ return records;
+ }
+
+ private OperationalJobRecord recordFromRow(Row row)
+ {
+ UUID operationId = row.getUUID("operation_id");
+ OperationType operationType =
OperationType.valueOf(row.getString("operation_type"));
+ OperationalJobStatus status =
OperationalJobStatus.valueOf(row.getString("status"));
+ Date startTimeDate = row.getTimestamp("start_time");
+ Instant startTime = startTimeDate != null ? startTimeDate.toInstant()
: null;
+ Date lastUpdateDate = row.getTimestamp("last_update");
+ Instant lastUpdate = lastUpdateDate != null ?
lastUpdateDate.toInstant() : null;
+ String failureReason = row.getString("failure_reason");
+ List<List<UUID>> nodeExecutionOrder = row.get("node_execution_order",
NODES_ORDER_TYPE);
+ if (nodeExecutionOrder != null && nodeExecutionOrder.isEmpty())
+ {
+ nodeExecutionOrder = null;
+ }
+ Map<String, String> operationMetadata =
row.getMap("operation_metadata", String.class, String.class);
+ if (operationMetadata.isEmpty())
+ {
+ operationMetadata = null;
+ }
+ return new OperationalJobRecord(operationId, operationType, status,
+ startTime, lastUpdate, failureReason,
+ nodeExecutionOrder, operationMetadata);
+ }
+}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/db/ClusterOpsNodeStateDatabaseAccessor.java
b/server/src/main/java/org/apache/cassandra/sidecar/db/ClusterOpsNodeStateDatabaseAccessor.java
new file mode 100644
index 00000000..fee68e91
--- /dev/null
+++
b/server/src/main/java/org/apache/cassandra/sidecar/db/ClusterOpsNodeStateDatabaseAccessor.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.db;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
+import org.apache.cassandra.sidecar.db.schema.ClusterOpsNodeStateSchema;
+import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.VisibleForTesting;
+
+/**
+ * Database accessor for the {@code cluster_ops_node_state} table.
+ */
+@Singleton
+public class ClusterOpsNodeStateDatabaseAccessor extends
DatabaseAccessor<ClusterOpsNodeStateSchema>
+{
+ private static final int DEFAULT_BATCH_CHUNK_SIZE = 100;
+
+ private final int batchChunkSize;
+
+ @Inject
+ public ClusterOpsNodeStateDatabaseAccessor(SidecarSchema sidecarSchema,
CQLSessionProvider sessionProvider)
+ {
+ this(sidecarSchema.tableSchema(ClusterOpsNodeStateSchema.class),
sessionProvider, DEFAULT_BATCH_CHUNK_SIZE);
+ }
+
+ @VisibleForTesting
+ public ClusterOpsNodeStateDatabaseAccessor(ClusterOpsNodeStateSchema
schema, CQLSessionProvider sessionProvider,
+ int batchChunkSize)
+ {
+ super(schema, sessionProvider);
+ this.batchChunkSize = batchChunkSize;
+ }
+
+ public void updateNodeStatus(String clusterName, UUID operationId, UUID
nodeId, OperationalJobStatus nodeStatus)
+ {
+ BoundStatement statement = tableSchema.insertNodeStatus()
+ .bind(clusterName, operationId,
nodeId, nodeStatus.name());
+ statement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
+ execute(statement);
+ }
+
+ public void updateNodeStatuses(String clusterName, UUID operationId,
+ List<UUID> nodeIds, OperationalJobStatus
nodeStatus)
+ {
+ for (int i = 0; i < nodeIds.size(); i += batchChunkSize)
+ {
+ List<UUID> chunk = nodeIds.subList(i, Math.min(i + batchChunkSize,
nodeIds.size()));
+ BatchStatement batch = new
BatchStatement(BatchStatement.Type.UNLOGGED);
+ for (UUID nodeId : chunk)
+ {
+ BoundStatement statement = tableSchema.insertNodeStatus()
+ .bind(clusterName,
operationId, nodeId, nodeStatus.name());
+ batch.add(statement);
+ }
+ batch.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
+ execute(batch);
+ }
+ }
+
+ @Nullable
+ public OperationalJobStatus getNodeStatus(String clusterName, UUID
operationId, UUID nodeId)
+ {
+ BoundStatement statement =
tableSchema.selectNodeStatus().bind(clusterName, operationId, nodeId);
+ statement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
+ ResultSet resultSet = execute(statement);
+ Row row = resultSet.one();
+ if (row == null)
+ {
+ return null;
+ }
+ return OperationalJobStatus.valueOf(row.getString("node_status"));
+ }
+
+ @NotNull
+ public Map<UUID, OperationalJobStatus> getNodeStatusesForOperation(String
clusterName, UUID operationId)
+ {
+ BoundStatement statement =
tableSchema.selectAllNodeStatuses().bind(clusterName, operationId);
+ statement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
+ ResultSet resultSet = execute(statement);
+ Map<UUID, OperationalJobStatus> statuses = new HashMap<>();
+ for (Row row : resultSet)
+ {
+ UUID nodeId = row.getUUID("node_id");
+ OperationalJobStatus status =
OperationalJobStatus.valueOf(row.getString("node_status"));
+ statuses.put(nodeId, status);
+ }
+ return statuses;
+ }
+}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/db/schema/ActiveClusterOpsSchema.java
b/server/src/main/java/org/apache/cassandra/sidecar/db/schema/ActiveClusterOpsSchema.java
new file mode 100644
index 00000000..7b0b6bef
--- /dev/null
+++
b/server/src/main/java/org/apache/cassandra/sidecar/db/schema/ActiveClusterOpsSchema.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.db.schema;
+
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
+import org.apache.cassandra.sidecar.config.SchemaKeyspaceConfiguration;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Schema for the {@code active_cluster_ops} table, which ensures that only
one active operation of a given type
+ * is running at a time across the cluster. When a Sidecar instance receives a
request to set an operation to
+ * {@code RUNNING}, it inserts the operation into this table using a
lightweight transaction (LWT) to guarantee
+ * mutual exclusion. Sidecar instances also periodically query this table to
discover when an operation has been
+ * activated.
+ */
+public class ActiveClusterOpsSchema extends TableSchema
+{
+ private static final String TABLE_NAME = "active_cluster_ops";
+
+ private final SchemaKeyspaceConfiguration keyspaceConfig;
+ private final SecondBoundConfiguration tableTtl;
+
+ private PreparedStatement trySetActive;
+ private PreparedStatement getActive;
+ private PreparedStatement getActiveByType;
+ private PreparedStatement clearActive;
+
+ public ActiveClusterOpsSchema(SchemaKeyspaceConfiguration keyspaceConfig,
SecondBoundConfiguration tableTtl)
+ {
+ this.keyspaceConfig = keyspaceConfig;
+ this.tableTtl = tableTtl;
+ }
+
+ @Override
+ protected String keyspaceName()
+ {
+ return keyspaceConfig.keyspace();
+ }
+
+ @Override
+ protected String tableName()
+ {
+ return TABLE_NAME;
+ }
+
+ @Override
+ protected String createSchemaStatement()
+ {
+ return String.format("CREATE TABLE IF NOT EXISTS %s.%s (" +
+ " cluster_name text," +
+ " datacenter text," +
+ " operation_type text," +
+ " operation_id timeuuid," +
+ " PRIMARY KEY ((cluster_name, datacenter),
operation_type)" +
+ ") WITH compaction = {'class':
'LeveledCompactionStrategy'}" +
+ " AND default_time_to_live = %s",
+ keyspaceConfig.keyspace(), TABLE_NAME,
tableTtl.toSeconds());
+ }
+
+ @Override
+ protected void prepareStatements(@NotNull Session session)
+ {
+ trySetActive = prepare(trySetActive, session,
CqlLiterals.trySetActive(keyspaceConfig));
+ getActive = prepare(getActive, session,
CqlLiterals.getActive(keyspaceConfig));
+ getActiveByType = prepare(getActiveByType, session,
CqlLiterals.getActiveByType(keyspaceConfig));
+ clearActive = prepare(clearActive, session,
CqlLiterals.clearActive(keyspaceConfig));
+ }
+
+ public PreparedStatement trySetActive()
+ {
+ return trySetActive;
+ }
+
+ public PreparedStatement getActive()
+ {
+ return getActive;
+ }
+
+ public PreparedStatement getActiveByType()
+ {
+ return getActiveByType;
+ }
+
+ public PreparedStatement clearActive()
+ {
+ return clearActive;
+ }
+
+ private static class CqlLiterals
+ {
+ static String trySetActive(SchemaKeyspaceConfiguration config)
+ {
+ return withTable("INSERT INTO %s.%s (cluster_name, datacenter,
operation_type, operation_id) " +
+ "VALUES (?, ?, ?, ?) IF NOT EXISTS", config);
+ }
+
+ static String getActive(SchemaKeyspaceConfiguration config)
+ {
+ return withTable("SELECT operation_type, operation_id FROM %s.%s "
+
+ "WHERE cluster_name = ? AND datacenter = ?",
config);
+ }
+
+ static String getActiveByType(SchemaKeyspaceConfiguration config)
+ {
+ return withTable("SELECT operation_id FROM %s.%s " +
+ "WHERE cluster_name = ? AND datacenter = ? AND
operation_type = ?", config);
+ }
+
+ static String clearActive(SchemaKeyspaceConfiguration config)
+ {
+ return withTable("DELETE FROM %s.%s " +
+ "WHERE cluster_name = ? AND datacenter = ? AND
operation_type = ? " +
+ "IF operation_id = ?", config);
+ }
+
+ private static String withTable(String format,
SchemaKeyspaceConfiguration config)
+ {
+ return String.format(format, config.keyspace(), TABLE_NAME);
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/db/schema/ClusterOpsNodeStateSchema.java
b/server/src/main/java/org/apache/cassandra/sidecar/db/schema/ClusterOpsNodeStateSchema.java
new file mode 100644
index 00000000..5f7aa0b5
--- /dev/null
+++
b/server/src/main/java/org/apache/cassandra/sidecar/db/schema/ClusterOpsNodeStateSchema.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.db.schema;
+
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
+import org.apache.cassandra.sidecar.config.SchemaKeyspaceConfiguration;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Schema for the {@code cluster_ops_node_state} table, which tracks the
status of an operation for a given node.
+ * Sidecar instances query this table to check the status of nodes being
operated on before their local nodes,
+ * enabling distributed coordination of cluster-wide operations.
+ * <p>
+ * The {@code cluster_name} partition key identifies the cluster being
operated on, allowing a single
+ * Cassandra cluster to store operational job state for multiple managed
clusters.
+ */
+public class ClusterOpsNodeStateSchema extends TableSchema
+{
+ private static final String TABLE_NAME = "cluster_ops_node_state";
+
+ private final SchemaKeyspaceConfiguration keyspaceConfig;
+ private final SecondBoundConfiguration tableTtl;
+
+ private PreparedStatement insertNodeStatus;
+ private PreparedStatement selectNodeStatus;
+ private PreparedStatement selectAllNodeStatuses;
+
+ public ClusterOpsNodeStateSchema(SchemaKeyspaceConfiguration
keyspaceConfig, SecondBoundConfiguration tableTtl)
+ {
+ this.keyspaceConfig = keyspaceConfig;
+ this.tableTtl = tableTtl;
+ }
+
+ @Override
+ protected String keyspaceName()
+ {
+ return keyspaceConfig.keyspace();
+ }
+
+ @Override
+ protected String tableName()
+ {
+ return TABLE_NAME;
+ }
+
+ @Override
+ protected String createSchemaStatement()
+ {
+ return String.format("CREATE TABLE IF NOT EXISTS %s.%s (" +
+ " cluster_name text," +
+ " operation_id timeuuid," +
+ " node_id uuid," +
+ " node_status text," +
+ " PRIMARY KEY ((cluster_name, operation_id),
node_id)" +
+ ") WITH compaction = {'class':
'LeveledCompactionStrategy'}" +
+ " AND default_time_to_live = %s",
+ keyspaceConfig.keyspace(), TABLE_NAME,
tableTtl.toSeconds());
+ }
+
+ @Override
+ protected void prepareStatements(@NotNull Session session)
+ {
+ insertNodeStatus = prepare(insertNodeStatus, session,
CqlLiterals.insertNodeStatus(keyspaceConfig));
+ selectNodeStatus = prepare(selectNodeStatus, session,
CqlLiterals.selectNodeStatus(keyspaceConfig));
+ selectAllNodeStatuses = prepare(selectAllNodeStatuses, session,
CqlLiterals.selectAllNodeStatuses(keyspaceConfig));
+ }
+
+ public PreparedStatement insertNodeStatus()
+ {
+ return insertNodeStatus;
+ }
+
+ public PreparedStatement selectNodeStatus()
+ {
+ return selectNodeStatus;
+ }
+
+ public PreparedStatement selectAllNodeStatuses()
+ {
+ return selectAllNodeStatuses;
+ }
+
+ private static class CqlLiterals
+ {
+ static String insertNodeStatus(SchemaKeyspaceConfiguration config)
+ {
+ return withTable("INSERT INTO %s.%s (" +
+ " cluster_name," +
+ " operation_id," +
+ " node_id," +
+ " node_status" +
+ ") VALUES (?, ?, ?, ?)", config);
+ }
+
+ static String selectNodeStatus(SchemaKeyspaceConfiguration config)
+ {
+ return withTable("SELECT node_status " +
+ "FROM %s.%s " +
+ "WHERE cluster_name = ? AND operation_id = ? AND
node_id = ?", config);
+ }
+
+ static String selectAllNodeStatuses(SchemaKeyspaceConfiguration config)
+ {
+ return withTable("SELECT node_id, node_status " +
+ "FROM %s.%s " +
+ "WHERE cluster_name = ? AND operation_id = ?",
config);
+ }
+
+ private static String withTable(String format,
SchemaKeyspaceConfiguration config)
+ {
+ return String.format(format, config.keyspace(), TABLE_NAME);
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/db/schema/ClusterOpsSchema.java
b/server/src/main/java/org/apache/cassandra/sidecar/db/schema/ClusterOpsSchema.java
new file mode 100644
index 00000000..46f2b237
--- /dev/null
+++
b/server/src/main/java/org/apache/cassandra/sidecar/db/schema/ClusterOpsSchema.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.db.schema;
+
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
+import org.apache.cassandra.sidecar.config.SchemaKeyspaceConfiguration;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Schema for the {@code cluster_ops} table, which persists and tracks active
and past operational jobs.
+ * When durable operational job storage is used, all operational job APIs
query from this table to retrieve
+ * job state, including the operation type, status, node execution order, and
operation metadata.
+ * <p>
+ * The {@code cluster_name} partition key identifies the cluster being
operated on, allowing a single
+ * Cassandra cluster to store operational job state for multiple managed
clusters.
+ */
+public class ClusterOpsSchema extends TableSchema
+{
+ private static final String TABLE_NAME = "cluster_ops";
+
+ private final SchemaKeyspaceConfiguration keyspaceConfig;
+ private final SecondBoundConfiguration tableTtl;
+
+ private PreparedStatement insertJob;
+ private PreparedStatement selectJob;
+ private PreparedStatement updateStatus;
+ private PreparedStatement updateStatusWithStartTime;
+ private PreparedStatement updateStatusWithFailure;
+ private PreparedStatement findAllJobs;
+
+ public ClusterOpsSchema(SchemaKeyspaceConfiguration keyspaceConfig,
SecondBoundConfiguration tableTtl)
+ {
+ this.keyspaceConfig = keyspaceConfig;
+ this.tableTtl = tableTtl;
+ }
+
+ @Override
+ protected String keyspaceName()
+ {
+ return keyspaceConfig.keyspace();
+ }
+
+ @Override
+ protected String tableName()
+ {
+ return TABLE_NAME;
+ }
+
+ @Override
+ protected String createSchemaStatement()
+ {
+ return String.format("CREATE TABLE IF NOT EXISTS %s.%s (" +
+ " cluster_name text," +
+ " operation_id timeuuid," +
+ " operation_type text," +
+ " status text," +
+ " start_time timestamp," +
+ " last_update timestamp," +
+ " failure_reason text," +
+ " node_execution_order
frozen<list<frozen<list<uuid>>>>," +
+ " operation_metadata frozen<map<text, text>>," +
+ " PRIMARY KEY ((cluster_name), operation_id,
operation_type)" +
+ ") WITH CLUSTERING ORDER BY (operation_id DESC,
operation_type ASC)" +
+ " AND compaction = {'class':
'LeveledCompactionStrategy'}" +
+ " AND default_time_to_live = %s",
+ keyspaceConfig.keyspace(), TABLE_NAME,
tableTtl.toSeconds());
+ }
+
+ @Override
+ protected void prepareStatements(@NotNull Session session)
+ {
+ insertJob = prepare(insertJob, session,
CqlLiterals.insertJob(keyspaceConfig));
+ selectJob = prepare(selectJob, session,
CqlLiterals.selectJob(keyspaceConfig));
+ updateStatus = prepare(updateStatus, session,
CqlLiterals.updateStatus(keyspaceConfig));
+ updateStatusWithStartTime = prepare(updateStatusWithStartTime,
session, CqlLiterals.updateStatusWithStartTime(keyspaceConfig));
+ updateStatusWithFailure = prepare(updateStatusWithFailure, session,
CqlLiterals.updateStatusWithFailure(keyspaceConfig));
+ findAllJobs = prepare(findAllJobs, session,
CqlLiterals.findAllJobs(keyspaceConfig));
+ }
+
+ public PreparedStatement insertJob()
+ {
+ return insertJob;
+ }
+
+ public PreparedStatement selectJob()
+ {
+ return selectJob;
+ }
+
+ public PreparedStatement updateStatus()
+ {
+ return updateStatus;
+ }
+
+ public PreparedStatement updateStatusWithStartTime()
+ {
+ return updateStatusWithStartTime;
+ }
+
+ public PreparedStatement updateStatusWithFailure()
+ {
+ return updateStatusWithFailure;
+ }
+
+ public PreparedStatement findAllJobs()
+ {
+ return findAllJobs;
+ }
+
+ private static class CqlLiterals
+ {
+ static String insertJob(SchemaKeyspaceConfiguration config)
+ {
+ return withTable("INSERT INTO %s.%s (" +
+ " cluster_name," +
+ " operation_id," +
+ " operation_type," +
+ " status," +
+ " start_time," +
+ " last_update," +
+ " failure_reason," +
+ " node_execution_order," +
+ " operation_metadata" +
+ ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", config);
+ }
+
+ static String selectJob(SchemaKeyspaceConfiguration config)
+ {
+ return withTable("SELECT cluster_name, " +
+ "operation_id, " +
+ "operation_type, " +
+ "status, " +
+ "start_time, " +
+ "last_update, " +
+ "failure_reason, " +
+ "node_execution_order, " +
+ "operation_metadata " +
+ "FROM %s.%s " +
+ "WHERE cluster_name = ? AND operation_id = ?",
config);
+ }
+
+ static String updateStatus(SchemaKeyspaceConfiguration config)
+ {
+ return withTable("UPDATE %s.%s SET status = ?, last_update = ? " +
+ "WHERE cluster_name = ? AND operation_id = ? AND
operation_type = ?", config);
+ }
+
+ static String updateStatusWithStartTime(SchemaKeyspaceConfiguration
config)
+ {
+ return withTable("UPDATE %s.%s SET status = ?, last_update = ?,
start_time = ? " +
+ "WHERE cluster_name = ? AND operation_id = ? AND
operation_type = ?", config);
+ }
+
+ static String updateStatusWithFailure(SchemaKeyspaceConfiguration
config)
+ {
+ return withTable("UPDATE %s.%s SET status = ?, last_update = ?,
failure_reason = ? " +
+ "WHERE cluster_name = ? AND operation_id = ? AND
operation_type = ?", config);
+ }
+
+ static String findAllJobs(SchemaKeyspaceConfiguration config)
+ {
+ return withTable("SELECT cluster_name, " +
+ "operation_id, " +
+ "operation_type, " +
+ "status, " +
+ "start_time, " +
+ "last_update, " +
+ "failure_reason, " +
+ "node_execution_order, " +
+ "operation_metadata " +
+ "FROM %s.%s " +
+ "WHERE cluster_name = ? LIMIT ?", config);
+ }
+
+ private static String withTable(String format,
SchemaKeyspaceConfiguration config)
+ {
+ return String.format(format, config.keyspace(), TABLE_NAME);
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/job/storage/CassandraStorageProvider.java
b/server/src/main/java/org/apache/cassandra/sidecar/job/storage/CassandraStorageProvider.java
new file mode 100644
index 00000000..47781845
--- /dev/null
+++
b/server/src/main/java/org/apache/cassandra/sidecar/job/storage/CassandraStorageProvider.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.job.storage;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Supplier;
+
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.DriverException;
+import org.apache.cassandra.sidecar.common.data.OperationType;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
+import org.apache.cassandra.sidecar.db.ActiveClusterOpsDatabaseAccessor;
+import org.apache.cassandra.sidecar.db.ClusterOpsDatabaseAccessor;
+import org.apache.cassandra.sidecar.db.ClusterOpsNodeStateDatabaseAccessor;
+import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * A {@link StorageProvider} implementation backed by Cassandra system tables.
+ * Delegates to three database accessors for cluster operations, node state,
and active operation coordination.
+ * Each instance is scoped to a single cluster identified by {@code
clusterName}.
+ */
+public class CassandraStorageProvider implements StorageProvider
+{
+ private final CQLSessionProvider sessionProvider;
+ private final ClusterOpsDatabaseAccessor clusterOpsAccessor;
+ private final ClusterOpsNodeStateDatabaseAccessor nodeStateAccessor;
+ private final ActiveClusterOpsDatabaseAccessor activeOpsAccessor;
+ private volatile String clusterName;
+ private volatile String datacenter;
+
+ /**
+ * Constructs a CassandraStorageProvider. When {@code clusterName} is
non-null, it indicates that
+ * operational state is stored in a separate Cassandra cluster from the
one Sidecar manages. When null,
+ * the cluster name is derived from the CQL session metadata during {@link
#initialize()}, which is the
+ * default for same-cluster storage.
+ */
+ public CassandraStorageProvider(CQLSessionProvider sessionProvider,
+ ClusterOpsDatabaseAccessor
clusterOpsAccessor,
+ ClusterOpsNodeStateDatabaseAccessor
nodeStateAccessor,
+ ActiveClusterOpsDatabaseAccessor
activeOpsAccessor,
+ @Nullable String clusterName)
+ {
+ this.sessionProvider = sessionProvider;
+ this.clusterOpsAccessor = clusterOpsAccessor;
+ this.nodeStateAccessor = nodeStateAccessor;
+ this.activeOpsAccessor = activeOpsAccessor;
+ this.clusterName = clusterName;
+ }
+
+ @Override
+ public void persistJob(OperationalJobRecord job)
+ {
+ execute("persistJob", () -> {
+ clusterOpsAccessor.persistJob(clusterName, job);
+ return null;
+ });
+ }
+
+ @Override
+ @Nullable
+ public OperationalJobRecord findJob(UUID jobId)
+ {
+ return execute("findJob", () ->
clusterOpsAccessor.findJob(clusterName, jobId));
+ }
+
+ @Override
+ public void updateJobStatus(UUID jobId, OperationType operationType,
OperationalJobStatus status,
+ @Nullable String failureReason)
+ {
+ execute("updateJobStatus", () -> {
+ clusterOpsAccessor.updateJobStatus(clusterName, jobId,
operationType, status, failureReason);
+ return null;
+ });
+ }
+
+ @Override
+ @NotNull
+ public List<OperationalJobRecord> findAllJobs(int limit)
+ {
+ return execute("findAllJobs", () ->
clusterOpsAccessor.findAllJobs(clusterName, limit));
+ }
+
+ @Override
+ public boolean trySetActiveOperation(OperationType operationType, UUID
operationId)
+ {
+ return execute("trySetActiveOperation",
+ () ->
activeOpsAccessor.trySetActiveOperation(clusterName, datacenter,
+
operationType, operationId));
+ }
+
+ @Override
+ @Nullable
+ public UUID getActiveOperation(OperationType operationType)
+ {
+ return execute("getActiveOperation",
+ () -> activeOpsAccessor.getActiveOperation(clusterName,
datacenter, operationType));
+ }
+
+ @Override
+ @NotNull
+ public Map<OperationType, UUID> getActiveOperations()
+ {
+ return execute("getActiveOperations",
+ () ->
activeOpsAccessor.getActiveOperations(clusterName, datacenter));
+ }
+
+ @Override
+ public boolean clearActiveOperation(OperationType operationType, UUID
operationId)
+ {
+ return execute("clearActiveOperation",
+ () ->
activeOpsAccessor.clearActiveOperation(clusterName, datacenter,
+
operationType, operationId));
+ }
+
+ @Override
+ public void updateNodeStatuses(UUID operationId, List<UUID> nodeIds,
OperationalJobStatus nodeStatus)
+ {
+ execute("updateNodeStatuses", () -> {
+ nodeStateAccessor.updateNodeStatuses(clusterName, operationId,
nodeIds, nodeStatus);
+ return null;
+ });
+ }
+
+ @Override
+ public void updateNodeStatus(UUID operationId, UUID nodeId,
OperationalJobStatus nodeStatus)
+ {
+ execute("updateNodeStatus", () -> {
+ nodeStateAccessor.updateNodeStatus(clusterName, operationId,
nodeId, nodeStatus);
+ return null;
+ });
+ }
+
+ @Override
+ @Nullable
+ public OperationalJobStatus getNodeStatus(UUID operationId, UUID nodeId)
+ {
+ return execute("getNodeStatus",
+ () -> nodeStateAccessor.getNodeStatus(clusterName,
operationId, nodeId));
+ }
+
+ @Override
+ @NotNull
+ public Map<UUID, OperationalJobStatus> getNodeStatusesForOperation(UUID
operationId)
+ {
+ return execute("getNodeStatusesForOperation",
+ () ->
nodeStateAccessor.getNodeStatusesForOperation(clusterName, operationId));
+ }
+
+ @Override
+ public void initialize()
+ {
+ // Schema initialization is handled by SidecarSchemaInitializer.
+ // TTL on all tables handles record pruning.
+ try
+ {
+ Session session = sessionProvider.get();
+ if (clusterName == null)
+ {
+ clusterName =
session.getCluster().getMetadata().getClusterName();
+ }
+ if (datacenter == null)
+ {
+ Collection<Host> connectedHosts =
session.getState().getConnectedHosts();
+ if (connectedHosts.isEmpty())
+ {
+ throw new StorageProviderException(
+ "Failed to resolve local datacenter: no connected hosts
available");
+ }
+ datacenter = connectedHosts.iterator().next().getDatacenter();
+ }
+ }
+ catch (CassandraUnavailableException e)
+ {
+ throw new StorageProviderException("Failed to initialize storage
provider", e);
+ }
+ }
+
+ @Override
+ public boolean isAvailable()
+ {
+ return clusterOpsAccessor.isAvailable()
+ && nodeStateAccessor.isAvailable()
+ && activeOpsAccessor.isAvailable();
+ }
+
+ @Override
+ public void close()
+ {
+ // Session lifecycle is managed by the DI container.
+ }
+
+ private <T> T execute(String operation, Supplier<T> action)
+ {
+ if (clusterName == null || datacenter == null)
+ {
+ throw new StorageProviderException("StorageProvider has not been
initialized. Call initialize() first.");
+ }
+ try
+ {
+ return action.get();
+ }
+ catch (DriverException e)
+ {
+ throw new StorageProviderException("Failed to execute " +
operation, e);
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/job/storage/OperationalJobRecord.java
b/server/src/main/java/org/apache/cassandra/sidecar/job/storage/OperationalJobRecord.java
new file mode 100644
index 00000000..7e3d508a
--- /dev/null
+++
b/server/src/main/java/org/apache/cassandra/sidecar/job/storage/OperationalJobRecord.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.job.storage;
+
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+
+import java.util.UUID;
+
+import com.datastax.driver.core.utils.UUIDs;
+import org.apache.cassandra.sidecar.common.data.OperationType;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.utils.Preconditions;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * A data transfer object representing the persisted state of an operational
job.
+ */
+public class OperationalJobRecord
+{
+ private final UUID jobId;
+ private final OperationType operationType;
+ private final OperationalJobStatus status;
+ private final long creationTimeMillis;
+ @Nullable
+ private final Instant startTime;
+ @Nullable
+ private final Instant lastUpdate;
+ @Nullable
+ private final String failureReason;
+ @Nullable
+ private final List<List<UUID>> nodeExecutionOrder;
+ @Nullable
+ private final Map<String, String> operationMetadata;
+
+ /**
+ * Constructs an OperationalJobRecord with the given fields.
+ *
+ * @param jobId time-based v1 UUID identifying the job
+ * @param operationType the operation type
+ * @param status the current status of the job
+ */
+ public OperationalJobRecord(UUID jobId, OperationType operationType,
OperationalJobStatus status)
+ {
+ this(jobId, operationType, status, null, Instant.now(), null, null,
null);
+ }
+
+ /**
+ * Constructs an OperationalJobRecord with all fields.
+ *
+ * @param jobId time-based v1 UUID identifying the job
+ * @param operationType the operation type
+ * @param status the current status of the job
+ * @param startTime the timestamp when execution started, or null
if not yet started
+ * @param lastUpdate the timestamp of the last status update, or
null for pre-existing rows
+ * @param failureReason the failure reason if the job failed, or null
+ * @param nodeExecutionOrder the ordered list of parallel node
groups for execution, or null
+ * @param operationMetadata the operation parameters, or null
+ */
+ public OperationalJobRecord(UUID jobId, OperationType operationType,
OperationalJobStatus status,
+ @Nullable Instant startTime,
+ @Nullable Instant lastUpdate,
+ @Nullable String failureReason,
+ @Nullable List<List<UUID>> nodeExecutionOrder,
+ @Nullable Map<String, String>
operationMetadata)
+ {
+ Preconditions.checkArgument(jobId != null, "jobId must not be null");
+ Preconditions.checkArgument(jobId.version() == 1, "jobId must be a
time-based (v1) UUID");
+ Preconditions.checkArgument(operationType != null, "operationType must
not be null");
+ Preconditions.checkArgument(status != null, "status must not be null");
+ this.jobId = jobId;
+ this.operationType = operationType;
+ this.status = status;
+ this.creationTimeMillis = UUIDs.unixTimestamp(jobId);
+ this.startTime = startTime;
+ this.lastUpdate = lastUpdate;
+ this.failureReason = failureReason;
+ this.nodeExecutionOrder = nodeExecutionOrder;
+ this.operationMetadata = operationMetadata;
+ }
+
+ /**
+ * @return the time-based v1 UUID identifying this job
+ */
+ public UUID jobId()
+ {
+ return jobId;
+ }
+
+ /**
+ * @return the operation type
+ */
+ public OperationType operationType()
+ {
+ return operationType;
+ }
+
+ /**
+ * @return the current status of the job
+ */
+ public OperationalJobStatus status()
+ {
+ return status;
+ }
+
+ /**
+ * @return the unix timestamp in milliseconds when the job was created,
extracted from the time-based UUID
+ */
+ public long creationTimeMillis()
+ {
+ return creationTimeMillis;
+ }
+
+ /**
+ * @return the timestamp when execution started, or null if not yet started
+ */
+ @Nullable
+ public Instant startTime()
+ {
+ return startTime;
+ }
+
+ /**
+ * @return the timestamp of the last status update, or null for
pre-existing rows
+ */
+ @Nullable
+ public Instant lastUpdate()
+ {
+ return lastUpdate;
+ }
+
+ /**
+ * @return the failure reason if the job failed, or null otherwise
+ */
+ @Nullable
+ public String failureReason()
+ {
+ return failureReason;
+ }
+
+ /**
+ * @return the ordered list of parallel node groups for execution, or null
if not set
+ */
+ @Nullable
+ public List<List<UUID>> nodeExecutionOrder()
+ {
+ return nodeExecutionOrder;
+ }
+
+ /**
+ * @return the operation parameters, or null if not set
+ */
+ @Nullable
+ public Map<String, String> operationMetadata()
+ {
+ return operationMetadata;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "OperationalJobRecord{" +
+ "jobId=" + jobId +
+ ", operationType=" + operationType +
+ ", status=" + status +
+ ", creationTimeMillis=" + creationTimeMillis +
+ ", startTime=" + startTime +
+ ", lastUpdate=" + lastUpdate +
+ ", failureReason=" + failureReason +
+ ", nodeExecutionOrder=" + nodeExecutionOrder +
+ ", operationMetadata=" + operationMetadata +
+ '}';
+ }
+}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/job/storage/StorageProvider.java
b/server/src/main/java/org/apache/cassandra/sidecar/job/storage/StorageProvider.java
new file mode 100644
index 00000000..0f931bc5
--- /dev/null
+++
b/server/src/main/java/org/apache/cassandra/sidecar/job/storage/StorageProvider.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.job.storage;
+
+import java.io.Closeable;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.cassandra.sidecar.common.data.OperationType;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * A provider-agnostic storage abstraction for durable operational job state.
+ * Each {@code StorageProvider} instance is scoped to a single cluster and
datacenter.
+ * The cluster and datacenter identity is configuration, not a per-call
parameter.
+ * <p>
+ * This interface defines a data access pattern for persisting, modifying, and
querying OperationalJobs.
+ * Higher-level coordination logic, such as clearing the active operation lock
when a job reaches
+ * a terminal state, belongs in the layers that depend on this interface.
+ */
+public interface StorageProvider extends Closeable
+{
+ // --- Operational Job Storage ---
+
+ /**
+ * Persist a new operational job record.
+ * <p>
+ * Implementations must provide upsert semantics as a retry safety net: if
called again with the
+ * same job ID (e.g., due to a network timeout where the caller does not
know if the first write
+ * succeeded), the existing record should be overwritten rather than
throwing.
+ * <p>
+ * Implementations should throw {@link StorageProviderException} on write
failure.
+ *
+ * @param job the job record to store
+ */
+ void persistJob(OperationalJobRecord job);
+
+ /**
+ * Find a job by its ID.
+ *
+ * @param jobId the job identifier
+ * @return the job record, or {@code null} if not found
+ */
+ @Nullable
+ OperationalJobRecord findJob(UUID jobId);
+
+ /**
+ * Update the status of an existing job.
+ * <p>
+ * Implementations should throw {@link StorageProviderException} on write
failure.
+ *
+ * @param jobId the job identifier
+ * @param operationType the operation type
+ * @param status the new status
+ * @param failureReason the failure reason if the job has failed, or
{@code null} otherwise
+ */
+ void updateJobStatus(UUID jobId, OperationType operationType,
OperationalJobStatus status,
+ @Nullable String failureReason);
+
+ /**
+ * Retrieve stored job records, up to the specified limit. Implementations
should return
+ * records in descending time order.
+ *
+ * @param limit the maximum number of job records to return
+ * @return list of job records, never null
+ */
+ @NotNull
+ List<OperationalJobRecord> findAllJobs(int limit);
+
+ // --- Active Operation Coordination ---
+
+ /**
+ * Set an operation as active if no other operation of the same type is
currently active.
+ * <p>
+ * Implementations must provide compare-and-set (CAS) semantics to ensure
only one active
+ * operation of a given type runs at a time across the cluster. This lock
is per operation
+ * plan, not per node; within a single operation, multiple nodes may be
operated on
+ * concurrently as defined by the node execution order.
+ *
+ * @param operationType the operation type
+ * @param operationId the unique identifier for this operation
+ * @return {@code true} if the operation was successfully set as active,
{@code false} if
+ * an operation of the same type is already active (including the
same operation ID)
+ */
+ boolean trySetActiveOperation(OperationType operationType, UUID
operationId);
+
+ /**
+ * Get the active operation ID for a given operation type.
+ *
+ * @param operationType the operation type
+ * @return the active operation ID, or {@code null} if no operation of
this type is active
+ */
+ @Nullable
+ UUID getActiveOperation(OperationType operationType);
+
+ /**
+ * Get all active operations for the local datacenter.
+ *
+ * @return a map of operation type to operation ID for all currently
active operations
+ * in the local datacenter, never null
+ */
+ @NotNull
+ Map<OperationType, UUID> getActiveOperations();
+
+ /**
+ * Clear the active operation lock, but only if the provided operation ID
matches
+ * the currently active one.
+ *
+ * @param operationType the operation type
+ * @param operationId the operation ID to clear
+ * @return {@code true} if the active operation was cleared, {@code false}
if the provided
+ * operation ID did not match the active one
+ */
+ boolean clearActiveOperation(OperationType operationType, UUID
operationId);
+
+ // --- Node Status Tracking ---
+
+ /**
+ * Update the status of multiple nodes for an operation in a single call.
+ * <p>
+ * Implementations should optimize with batch writes where possible. This
method is not
+ * guaranteed to be atomic; if any write fails, implementations should
throw
+ * {@link StorageProviderException}. Callers can safely retry the full
list since
+ * writing the same status to a node is idempotent.
+ *
+ * @param operationId the operation identifier
+ * @param nodeIds the list of node identifiers to update
+ * @param nodeStatus the status to set for all specified nodes
+ */
+ void updateNodeStatuses(UUID operationId, List<UUID> nodeIds,
OperationalJobStatus nodeStatus);
+
+ /**
+ * Update a node's status within an operation.
+ * <p>
+ * Implementations should throw {@link StorageProviderException} on write
failure.
+ *
+ * @param operationId the operation identifier
+ * @param nodeId the node identifier
+ * @param nodeStatus the new status for the node
+ */
+ void updateNodeStatus(UUID operationId, UUID nodeId, OperationalJobStatus
nodeStatus);
+
+ /**
+ * Get a node's current status within an operation.
+ *
+ * @param operationId the operation identifier
+ * @param nodeId the node identifier
+ * @return the node's current status, or {@code null} if not found
+ */
+ @Nullable
+ OperationalJobStatus getNodeStatus(UUID operationId, UUID nodeId);
+
+ /**
+ * Get all node statuses for an operation.
+ *
+ * @param operationId the operation identifier
+ * @return a map of node IDs to their current status, never null
+ */
+ @NotNull
+ Map<UUID, OperationalJobStatus> getNodeStatusesForOperation(UUID
operationId);
+
+ // --- Lifecycle ---
+
+ /**
+ * Initialize the storage provider (e.g. schema creation, connection
setup).
+ * <p>
+ * Implementations must use this method to prepare the StorageProvider for
accepting users
+ * requests, and ensure that this is idempotent. No other methods should
be used to do
+ * this activity.
+ * <p>
+ * Callers must invoke this method before calling any other method on this
interface.
+ * <p>
+ * Implementations must ensure that stored records are eventually pruned.
The default
+ * Cassandra implementation relies on table-level TTL. Other
implementations should
+ * establish their own pruning strategy (e.g., scheduled deletes of old
records).
+ */
+ void initialize();
+
+ /**
+ * Returns whether the provider is ready to accept operations.
+ *
+ * @return {@code true} if the provider is initialized and available,
{@code false} otherwise
+ */
+ boolean isAvailable();
+}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/job/storage/StorageProviderException.java
b/server/src/main/java/org/apache/cassandra/sidecar/job/storage/StorageProviderException.java
new file mode 100644
index 00000000..1d8d348f
--- /dev/null
+++
b/server/src/main/java/org/apache/cassandra/sidecar/job/storage/StorageProviderException.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.job.storage;
+
+/**
+ * A runtime exception that {@link StorageProvider} implementations should
wrap their
+ * backend-specific errors in. This gives callers a single exception type to
catch regardless
+ * of the underlying backend.
+ * <p>
+ * Implementations should catch backend-specific exceptions (e.g., {@code
DriverException}
+ * for Cassandra, {@code SQLException} for JDBC) and wrap them in {@code
StorageProviderException}
+ * with the original exception as the cause.
+ */
+public class StorageProviderException extends RuntimeException
+{
+ /**
+ * Constructs a new StorageProviderException with the specified detail
message.
+ *
+ * @param message the detail message
+ */
+ public StorageProviderException(String message)
+ {
+ super(message);
+ }
+
+ /**
+ * Constructs a new StorageProviderException with the specified detail
message and cause.
+ *
+ * @param message the detail message
+ * @param cause the underlying cause
+ */
+ public StorageProviderException(String message, Throwable cause)
+ {
+ super(message, cause);
+ }
+}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/modules/OperationalJobsModule.java
b/server/src/main/java/org/apache/cassandra/sidecar/modules/OperationalJobsModule.java
new file mode 100644
index 00000000..820e7c38
--- /dev/null
+++
b/server/src/main/java/org/apache/cassandra/sidecar/modules/OperationalJobsModule.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.modules;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import com.google.inject.multibindings.ProvidesIntoMap;
+import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.db.ActiveClusterOpsDatabaseAccessor;
+import org.apache.cassandra.sidecar.db.ClusterOpsDatabaseAccessor;
+import org.apache.cassandra.sidecar.db.ClusterOpsNodeStateDatabaseAccessor;
+import org.apache.cassandra.sidecar.db.schema.ActiveClusterOpsSchema;
+import org.apache.cassandra.sidecar.db.schema.ClusterOpsNodeStateSchema;
+import org.apache.cassandra.sidecar.db.schema.ClusterOpsSchema;
+import org.apache.cassandra.sidecar.db.schema.TableSchema;
+import org.apache.cassandra.sidecar.job.storage.CassandraStorageProvider;
+import org.apache.cassandra.sidecar.job.storage.StorageProvider;
+import org.apache.cassandra.sidecar.modules.multibindings.KeyClassMapKey;
+import org.apache.cassandra.sidecar.modules.multibindings.TableSchemaMapKeys;
+
+/**
+ * Guice module for operational job storage, providing schema registrations
+ * and the {@link CassandraStorageProvider}.
+ */
+public class OperationalJobsModule extends AbstractModule
+{
+ @ProvidesIntoMap
+ @KeyClassMapKey(TableSchemaMapKeys.ClusterOpsSchemaKey.class)
+ TableSchema clusterOpsSchema(SidecarConfiguration configuration)
+ {
+ return new
ClusterOpsSchema(configuration.serviceConfiguration().schemaKeyspaceConfiguration(),
+
configuration.operationalJobConfiguration().tablesTtl());
+ }
+
+ @ProvidesIntoMap
+ @KeyClassMapKey(TableSchemaMapKeys.ClusterOpsNodeStateSchemaKey.class)
+ TableSchema clusterOpsNodeStateSchema(SidecarConfiguration configuration)
+ {
+ return new
ClusterOpsNodeStateSchema(configuration.serviceConfiguration().schemaKeyspaceConfiguration(),
+
configuration.operationalJobConfiguration().tablesTtl());
+ }
+
+ @ProvidesIntoMap
+ @KeyClassMapKey(TableSchemaMapKeys.ActiveClusterOpsSchemaKey.class)
+ TableSchema activeClusterOpsSchema(SidecarConfiguration configuration)
+ {
+ return new
ActiveClusterOpsSchema(configuration.serviceConfiguration().schemaKeyspaceConfiguration(),
+
configuration.operationalJobConfiguration().tablesTtl());
+ }
+
+ @Provides
+ @Singleton
+ StorageProvider cassandraStorageProvider(CQLSessionProvider
sessionProvider,
+ ClusterOpsDatabaseAccessor
clusterOpsAccessor,
+
ClusterOpsNodeStateDatabaseAccessor nodeStateAccessor,
+ ActiveClusterOpsDatabaseAccessor
activeOpsAccessor)
+ {
+ return new CassandraStorageProvider(sessionProvider,
+ clusterOpsAccessor,
nodeStateAccessor, activeOpsAccessor, null);
+ }
+}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/modules/SidecarModules.java
b/server/src/main/java/org/apache/cassandra/sidecar/modules/SidecarModules.java
index 7361254d..daf59eb7 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/modules/SidecarModules.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/modules/SidecarModules.java
@@ -64,6 +64,7 @@ public class SidecarModules
.add(new LifecycleModule())
.add(new LiveMigrationModule())
.add(new MultiBindingTypeResolverModule())
+ .add(new OperationalJobsModule())
.add(new OpenApiModule())
.add(new RestoreJobModule())
.add(new SchedulingModule())
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/TableSchemaMapKeys.java
b/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/TableSchemaMapKeys.java
index 13a1ec83..57abe5d4 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/TableSchemaMapKeys.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/TableSchemaMapKeys.java
@@ -34,4 +34,7 @@ public interface TableSchemaMapKeys
interface SystemViewsClientsSchemaKey extends ClassKey {}
interface TableHistorySchemaKey extends ClassKey {}
interface CdcStatesSchemaKey extends ClassKey {}
+ interface ClusterOpsSchemaKey extends ClassKey {}
+ interface ClusterOpsNodeStateSchemaKey extends ClassKey {}
+ interface ActiveClusterOpsSchemaKey extends ClassKey {}
}
diff --git
a/server/src/test/integration/org/apache/cassandra/sidecar/db/ActiveClusterOpsDatabaseAccessorIntegrationTest.java
b/server/src/test/integration/org/apache/cassandra/sidecar/db/ActiveClusterOpsDatabaseAccessorIntegrationTest.java
new file mode 100644
index 00000000..205dfe28
--- /dev/null
+++
b/server/src/test/integration/org/apache/cassandra/sidecar/db/ActiveClusterOpsDatabaseAccessorIntegrationTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.db;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.utils.UUIDs;
+import org.apache.cassandra.sidecar.common.data.OperationType;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration tests for {@link ActiveClusterOpsDatabaseAccessor}
+ */
+class ActiveClusterOpsDatabaseAccessorIntegrationTest extends
IntegrationTestBase
+{
+ @CassandraIntegrationTest
+ void testLwtOperations()
+ {
+ waitForSchemaReady(10, TimeUnit.SECONDS);
+
+ ActiveClusterOpsDatabaseAccessor accessor =
injector.getInstance(ActiveClusterOpsDatabaseAccessor.class);
+ Session session = maybeGetSession();
+ String clusterName =
session.getCluster().getMetadata().getClusterName();
+ String datacenter =
session.getState().getConnectedHosts().iterator().next().getDatacenter();
+
+ UUID operationId1 = UUIDs.timeBased();
+ UUID operationId2 = UUIDs.timeBased();
+
+ assertThat(accessor.getActiveOperation(clusterName, datacenter,
OperationType.REPAIR))
+ .withFailMessage("getActiveOperation should return null when
no active operation exists")
+ .isNull();
+ assertThat(accessor.getActiveOperations(clusterName, datacenter))
+ .withFailMessage("getActiveOperations should return an empty
map when no active operation exists")
+ .isEmpty();
+
+ assertThat(accessor)
+ .withFailMessage("trySetActiveOperation should succeed when no
active operation, and getActiveOperation should return the operation")
+ .satisfies(a -> {
+ assertThat(a.trySetActiveOperation(clusterName,
datacenter, OperationType.REPAIR, operationId1)).isTrue();
+ assertThat(a.getActiveOperation(clusterName, datacenter,
OperationType.REPAIR)).isEqualTo(operationId1);
+ });
+
+ assertThat(accessor)
+ .withFailMessage("trySetActiveOperation should fail when an
operation of the same type is active")
+ .satisfies(a -> {
+ assertThat(a.trySetActiveOperation(clusterName,
datacenter, OperationType.REPAIR, operationId2)).isFalse();
+ assertThat(a.getActiveOperation(clusterName, datacenter,
OperationType.REPAIR)).isEqualTo(operationId1);
+ });
+
+ assertThat(accessor.trySetActiveOperation(clusterName, datacenter,
OperationType.REPAIR, operationId1))
+ .withFailMessage("trySetActiveOperation should fail when
retried with same active operation ID")
+ .isFalse();
+
+ UUID decommissionId = UUIDs.timeBased();
+ assertThat(accessor.trySetActiveOperation(clusterName, datacenter,
OperationType.DECOMMISSION, decommissionId))
+ .withFailMessage("trySetActiveOperation should succeed for a
different operation type")
+ .isTrue();
+ assertThat(accessor.getActiveOperations(clusterName, datacenter))
+ .withFailMessage("getActiveOperations should return all
concurrently active operations")
+ .hasSize(2)
+ .containsEntry(OperationType.REPAIR, operationId1)
+ .containsEntry(OperationType.DECOMMISSION, decommissionId);
+
+ assertThat(accessor)
+ .withFailMessage("clearActiveOperation should fail when a
non-matching operation ID is supplied")
+ .satisfies(a -> {
+ assertThat(a.clearActiveOperation(clusterName, datacenter,
OperationType.REPAIR, operationId2)).isFalse();
+ assertThat(a.getActiveOperation(clusterName, datacenter,
OperationType.REPAIR)).isEqualTo(operationId1);
+ });
+
+ assertThat(accessor)
+ .withFailMessage("clearActiveOperation should succeed when the
operation ID matches the active operation")
+ .satisfies(a -> {
+ assertThat(a.clearActiveOperation(clusterName, datacenter,
OperationType.REPAIR, operationId1)).isTrue();
+ assertThat(a.getActiveOperation(clusterName, datacenter,
OperationType.REPAIR)).isNull();
+ });
+
+ assertThat(accessor.clearActiveOperation(clusterName, datacenter,
OperationType.REPAIR, operationId1))
+ .withFailMessage("clearActiveOperation should be a safe no-op
when retried after operation is already cleared")
+ .isFalse();
+
+ assertThat(accessor)
+ .withFailMessage("trySetActiveOperation should succeed after
active operation is cleared, and other operation types should be unaffected")
+ .satisfies(a -> {
+ assertThat(a.trySetActiveOperation(clusterName,
datacenter, OperationType.REPAIR, operationId2)).isTrue();
+ assertThat(a.getActiveOperation(clusterName, datacenter,
OperationType.REPAIR)).isEqualTo(operationId2);
+ assertThat(a.getActiveOperation(clusterName, datacenter,
OperationType.DECOMMISSION)).isEqualTo(decommissionId);
+ });
+ }
+}
diff --git
a/server/src/test/integration/org/apache/cassandra/sidecar/db/ClusterOpsDatabaseAccessorIntegrationTest.java
b/server/src/test/integration/org/apache/cassandra/sidecar/db/ClusterOpsDatabaseAccessorIntegrationTest.java
new file mode 100644
index 00000000..b0518fb9
--- /dev/null
+++
b/server/src/test/integration/org/apache/cassandra/sidecar/db/ClusterOpsDatabaseAccessorIntegrationTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.db;
+
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import com.datastax.driver.core.utils.UUIDs;
+import org.apache.cassandra.sidecar.common.data.OperationType;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.job.storage.OperationalJobRecord;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration tests for {@link ClusterOpsDatabaseAccessor}
+ */
+class ClusterOpsDatabaseAccessorIntegrationTest extends IntegrationTestBase
+{
+ @CassandraIntegrationTest
+ void testCrudOperations()
+ {
+ waitForSchemaReady(10, TimeUnit.SECONDS);
+
+ ClusterOpsDatabaseAccessor accessor =
injector.getInstance(ClusterOpsDatabaseAccessor.class);
+ String clusterName =
maybeGetSession().getCluster().getMetadata().getClusterName();
+
+ assertThat(accessor.findJob(clusterName, UUIDs.timeBased()))
+ .withFailMessage("findJob should return null for a job ID that
was never persisted")
+ .isNull();
+
+ assertThat(accessor.findAllJobs(clusterName, 10))
+ .withFailMessage("findAllJobs should return an empty list when
no jobs exist")
+ .isEmpty();
+
+ UUID jobId1 = UUIDs.timeBased();
+ List<List<UUID>> nodeOrder = Arrays.asList(
+ Arrays.asList(UUID.randomUUID(), UUID.randomUUID()),
+ Arrays.asList(UUID.randomUUID())
+ );
+ Map<String, String> metadata = Map.of("key1", "value1", "key2",
"value2");
+ OperationalJobRecord job1 = new OperationalJobRecord(jobId1,
OperationType.REPAIR, OperationalJobStatus.CREATED,
+ null,
Instant.now(), null, nodeOrder, metadata);
+ accessor.persistJob(clusterName, job1);
+
+ OperationalJobRecord found = accessor.findJob(clusterName, jobId1);
+ assertThat(found)
+ .withFailMessage("findJob should return the persisted job with
all fields intact")
+ .isNotNull()
+ .satisfies(job -> {
+ assertThat(job.jobId()).isEqualTo(jobId1);
+
assertThat(job.operationType()).isEqualTo(OperationType.REPAIR);
+
assertThat(job.status()).isEqualTo(OperationalJobStatus.CREATED);
+ assertThat(job.startTime()).isNull();
+ assertThat(job.lastUpdate()).isNotNull();
+ assertThat(job.failureReason()).isNull();
+ assertThat(job.nodeExecutionOrder()).isEqualTo(nodeOrder);
+ assertThat(job.operationMetadata()).isEqualTo(metadata);
+ });
+
+ UUID jobId2 = UUIDs.timeBased();
+ OperationalJobRecord job2 = new OperationalJobRecord(jobId2,
OperationType.DECOMMISSION, OperationalJobStatus.CREATED);
+ accessor.persistJob(clusterName, job2);
+
+ OperationalJobRecord found2 = accessor.findJob(clusterName, jobId2);
+ assertThat(found2)
+ .withFailMessage("findJob should return the persisted job with
null nullable fields preserved")
+ .isNotNull()
+ .satisfies(job -> {
+ assertThat(job.jobId()).isEqualTo(jobId2);
+
assertThat(job.operationType()).isEqualTo(OperationType.DECOMMISSION);
+ assertThat(job.nodeExecutionOrder()).isNull();
+ assertThat(job.operationMetadata()).isNull();
+ });
+
+ Instant beforeRunning =
Instant.now().truncatedTo(java.time.temporal.ChronoUnit.MILLIS);
+ accessor.updateJobStatus(clusterName, jobId1, OperationType.REPAIR,
OperationalJobStatus.RUNNING, null);
+ OperationalJobRecord updated = accessor.findJob(clusterName, jobId1);
+ assertThat(updated)
+ .withFailMessage("findJob should reflect the updated status
while preserving other fields")
+ .isNotNull()
+ .satisfies(job -> {
+
assertThat(job.status()).isEqualTo(OperationalJobStatus.RUNNING);
+ assertThat(job.startTime()).isNotNull();
+
assertThat(job.startTime()).isAfterOrEqualTo(beforeRunning);
+
assertThat(job.lastUpdate()).isAfterOrEqualTo(beforeRunning);
+ assertThat(job.nodeExecutionOrder()).isEqualTo(nodeOrder);
+ });
+
+ Instant firstStartTime = updated.startTime();
+ accessor.updateJobStatus(clusterName, jobId1, OperationType.REPAIR,
OperationalJobStatus.RUNNING, null);
+ OperationalJobRecord afterSecondRunning =
accessor.findJob(clusterName, jobId1);
+ assertThat(afterSecondRunning.startTime())
+ .withFailMessage("start_time should not change on subsequent
RUNNING updates")
+ .isEqualTo(firstStartTime);
+ assertThat(afterSecondRunning.lastUpdate())
+ .withFailMessage("last_update should advance on each status
update")
+ .isAfterOrEqualTo(updated.lastUpdate());
+
+ accessor.updateJobStatus(clusterName, jobId1, OperationType.REPAIR,
OperationalJobStatus.FAILED, "node unreachable");
+ OperationalJobRecord failed = accessor.findJob(clusterName, jobId1);
+ assertThat(failed)
+ .isNotNull()
+ .satisfies(job -> {
+
assertThat(job.status()).isEqualTo(OperationalJobStatus.FAILED);
+ assertThat(job.failureReason()).isEqualTo("node
unreachable");
+ assertThat(job.startTime()).isEqualTo(firstStartTime);
+ });
+
+ UUID jobId3 = UUIDs.timeBased();
+ accessor.persistJob(clusterName, new OperationalJobRecord(jobId3,
OperationType.REPAIR, OperationalJobStatus.CREATED));
+ List<OperationalJobRecord> allJobs = accessor.findAllJobs(clusterName,
10);
+ assertThat(allJobs)
+ .withFailMessage("findAllJobs should return every persisted
job when the limit exceeds total count")
+ .hasSize(3);
+
+ List<OperationalJobRecord> limited = accessor.findAllJobs(clusterName,
2);
+ assertThat(limited)
+ .withFailMessage("findAllJobs should truncate results to the
specified limit")
+ .hasSize(2);
+
+ assertThat(allJobs.get(0).jobId())
+ .withFailMessage("findAllJobs should return the most recently
created job first")
+ .isEqualTo(jobId3);
+
+ OperationalJobRecord job1Updated = new OperationalJobRecord(jobId1,
OperationType.REPAIR,
+
OperationalJobStatus.SUCCEEDED,
+ null,
Instant.now(), null, nodeOrder, metadata);
+ accessor.persistJob(clusterName, job1Updated);
+ OperationalJobRecord afterUpsert = accessor.findJob(clusterName,
jobId1);
+ assertThat(afterUpsert)
+ .withFailMessage("findJob should reflect the overwritten
status after upsert with the same job ID")
+ .isNotNull()
+ .satisfies(job -> {
+
assertThat(job.status()).isEqualTo(OperationalJobStatus.SUCCEEDED);
+ });
+ }
+}
diff --git
a/server/src/test/integration/org/apache/cassandra/sidecar/db/ClusterOpsNodeStateDatabaseAccessorIntegrationTest.java
b/server/src/test/integration/org/apache/cassandra/sidecar/db/ClusterOpsNodeStateDatabaseAccessorIntegrationTest.java
new file mode 100644
index 00000000..133b6221
--- /dev/null
+++
b/server/src/test/integration/org/apache/cassandra/sidecar/db/ClusterOpsNodeStateDatabaseAccessorIntegrationTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.db;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import com.datastax.driver.core.utils.UUIDs;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration tests for {@link ClusterOpsNodeStateDatabaseAccessor}
+ */
+class ClusterOpsNodeStateDatabaseAccessorIntegrationTest extends
IntegrationTestBase
+{
+ @CassandraIntegrationTest
+ void testCrudOperations()
+ {
+ waitForSchemaReady(10, TimeUnit.SECONDS);
+
+ ClusterOpsNodeStateDatabaseAccessor accessor =
injector.getInstance(ClusterOpsNodeStateDatabaseAccessor.class);
+ String clusterName =
maybeGetSession().getCluster().getMetadata().getClusterName();
+
+ UUID operationId = UUIDs.timeBased();
+ UUID nodeId1 = UUID.randomUUID();
+ UUID nodeId2 = UUID.randomUUID();
+ UUID nodeId3 = UUID.randomUUID();
+
+ assertThat(accessor.getNodeStatus(clusterName, operationId, nodeId1))
+ .withFailMessage("getNodeStatus should return null for a
non-existent node")
+ .isNull();
+
+ assertThat(accessor.getNodeStatusesForOperation(clusterName,
operationId))
+ .withFailMessage("getNodeStatusesForOperation should return an
empty map for a non-existent operation")
+ .isEmpty();
+
+ accessor.updateNodeStatus(clusterName, operationId, nodeId1,
OperationalJobStatus.CREATED);
+ assertThat(accessor.getNodeStatus(clusterName, operationId, nodeId1))
+ .withFailMessage("getNodeStatus should return correct node
status after updateNodeStatus")
+ .isEqualTo(OperationalJobStatus.CREATED);
+
+ accessor.updateNodeStatus(clusterName, operationId, nodeId1,
OperationalJobStatus.RUNNING);
+ assertThat(accessor.getNodeStatus(clusterName, operationId, nodeId1))
+ .withFailMessage("getNodeStatus should return updated status
after updateNodeStatus overwrites previous status")
+ .isEqualTo(OperationalJobStatus.RUNNING);
+
+ accessor.updateNodeStatus(clusterName, operationId, nodeId2,
OperationalJobStatus.CREATED);
+ accessor.updateNodeStatus(clusterName, operationId, nodeId3,
OperationalJobStatus.CREATED);
+ Map<UUID, OperationalJobStatus> allStatuses =
+ accessor.getNodeStatusesForOperation(clusterName, operationId);
+ assertThat(allStatuses)
+ .withFailMessage("getNodeStatusesForOperation should return
all nodes with their latest statuses")
+ .hasSize(3)
+ .containsEntry(nodeId1, OperationalJobStatus.RUNNING)
+ .containsEntry(nodeId2, OperationalJobStatus.CREATED)
+ .containsEntry(nodeId3, OperationalJobStatus.CREATED);
+
+ accessor.updateNodeStatuses(clusterName, operationId,
+ Arrays.asList(nodeId2, nodeId3),
+ OperationalJobStatus.CREATED);
+ Map<UUID, OperationalJobStatus> afterRetry =
+ accessor.getNodeStatusesForOperation(clusterName, operationId);
+ assertThat(afterRetry)
+ .withFailMessage("updateNodeStatuses should be idempotent")
+ .hasSize(3)
+ .containsEntry(nodeId2, OperationalJobStatus.CREATED)
+ .containsEntry(nodeId3, OperationalJobStatus.CREATED);
+
+ UUID otherOperationId = UUIDs.timeBased();
+ accessor.updateNodeStatus(clusterName, otherOperationId, nodeId1,
OperationalJobStatus.SUCCEEDED);
+ assertThat(accessor.getNodeStatusesForOperation(clusterName,
otherOperationId))
+ .withFailMessage("getNodeStatusesForOperation should only
return nodes for the queried operation")
+ .hasSize(1);
+ assertThat(accessor.getNodeStatus(clusterName, operationId, nodeId1))
+ .withFailMessage("getNodeStatus should be unaffected by node
updates in a different operation")
+ .isEqualTo(OperationalJobStatus.RUNNING);
+
+ UUID chunkOperationId = UUIDs.timeBased();
+ List<UUID> nodeIds = new ArrayList<>();
+ for (int i = 0; i < 250; i++)
+ {
+ nodeIds.add(UUID.randomUUID());
+ }
+ accessor.updateNodeStatuses(clusterName, chunkOperationId, nodeIds,
OperationalJobStatus.CREATED);
+ Map<UUID, OperationalJobStatus> chunkStatuses =
+ accessor.getNodeStatusesForOperation(clusterName,
chunkOperationId);
+ assertThat(chunkStatuses)
+ .withFailMessage("getNodeStatusesForOperation should return
all nodes persisted across chunked UNLOGGED batches")
+ .hasSize(250);
+ for (UUID nodeId : nodeIds)
+ {
+ assertThat(chunkStatuses)
+ .withFailMessage("getNodeStatusesForOperation should show
correct status for node %s after chunked batch write", nodeId)
+ .containsEntry(nodeId, OperationalJobStatus.CREATED);
+ }
+ }
+}
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/config/yaml/OperationalJobConfigurationImplTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/config/yaml/OperationalJobConfigurationImplTest.java
new file mode 100644
index 00000000..09cd0e89
--- /dev/null
+++
b/server/src/test/java/org/apache/cassandra/sidecar/config/yaml/OperationalJobConfigurationImplTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.config.yaml;
+
+import org.junit.jupiter.api.Test;
+
+import
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Tests for {@link OperationalJobConfigurationImpl}
+ */
+class OperationalJobConfigurationImplTest
+{
+ @Test
+ void testDefaultTtl()
+ {
+ OperationalJobConfigurationImpl config = new
OperationalJobConfigurationImpl();
+
assertThat(config.tablesTtl()).isEqualTo(SecondBoundConfiguration.parse("90d"));
+ }
+
+ @Test
+ void testTtlBelowMinimumThrows()
+ {
+ assertThatThrownBy(() -> OperationalJobConfigurationImpl.builder()
+
.tablesTtl(SecondBoundConfiguration.parse("13d"))
+ .build())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("tablesTtl cannot be less than");
+ }
+}
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/db/SidecarSchemaTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/db/SidecarSchemaTest.java
index 2272df55..1cece57b 100644
---
a/server/src/test/java/org/apache/cassandra/sidecar/db/SidecarSchemaTest.java
+++
b/server/src/test/java/org/apache/cassandra/sidecar/db/SidecarSchemaTest.java
@@ -54,6 +54,9 @@ import org.apache.cassandra.sidecar.cluster.InstancesMetadata;
import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
import org.apache.cassandra.sidecar.coordination.ClusterLease;
+import org.apache.cassandra.sidecar.db.schema.ActiveClusterOpsSchema;
+import org.apache.cassandra.sidecar.db.schema.ClusterOpsNodeStateSchema;
+import org.apache.cassandra.sidecar.db.schema.ClusterOpsSchema;
import org.apache.cassandra.sidecar.db.schema.RestoreJobsSchema;
import org.apache.cassandra.sidecar.db.schema.RestoreRangesSchema;
import org.apache.cassandra.sidecar.db.schema.RestoreSlicesSchema;
@@ -133,7 +136,7 @@ public class SidecarSchemaTest
{
sidecarSchemaInitializer.execute(Promise.promise());
loopAssert(10, 500, () -> {
- assertThat(interceptedExecStmts.size()).isEqualTo(9);
+ assertThat(interceptedExecStmts.size()).isEqualTo(12);
assertThat(interceptedExecStmts.get(0)).as("Create keyspace should
be executed the first")
.contains("CREATE KEYSPACE
IF NOT EXISTS sidecar_internal");
assertThat(interceptedExecStmts).as("Create table should be
executed for job table")
@@ -152,6 +155,12 @@ public class SidecarSchemaTest
.anyMatch(stmt ->
stmt.contains("CREATE TABLE IF NOT EXISTS sidecar_internal.configs"));
assertThat(interceptedExecStmts).as("Create table should be
executed for sidecar_lease_v1 table")
.anyMatch(stmt ->
stmt.contains("CREATE TABLE IF NOT EXISTS sidecar_internal.sidecar_lease_v1"));
+ assertThat(interceptedExecStmts).as("Create table should be
executed for cluster_ops table")
+ .anyMatch(stmt ->
stmt.contains("CREATE TABLE IF NOT EXISTS sidecar_internal.cluster_ops"));
+ assertThat(interceptedExecStmts).as("Create table should be
executed for active_cluster_ops table")
+ .anyMatch(stmt ->
stmt.contains("CREATE TABLE IF NOT EXISTS
sidecar_internal.active_cluster_ops"));
+ assertThat(interceptedExecStmts).as("Create table should be
executed for cluster_ops_node_state table")
+ .anyMatch(stmt ->
stmt.contains("CREATE TABLE IF NOT EXISTS
sidecar_internal.cluster_ops_node_state"));
List<String> expectedPrepStatements = Arrays.asList(
"INSERT INTO sidecar_internal.restore_job_v6 ( created_at,
job_id, keyspace_name, table_name, " +
@@ -225,7 +234,23 @@ public class SidecarSchemaTest
"SELECT table_schema FROM sidecar_internal.table_schema_history
WHERE keyspace_name = ? AND table_name = ? AND version = ?",
"INSERT INTO sidecar_internal.cdc_state_v2 (job_id, split, start,
end, state) VALUES (?, ?, ?, ?, ?) USING TIMESTAMP ?",
- "SELECT start, end, state FROM sidecar_internal.cdc_state_v2 WHERE
job_id = ? AND split = ?"
+ "SELECT start, end, state FROM sidecar_internal.cdc_state_v2 WHERE
job_id = ? AND split = ?",
+
+ "INSERT INTO sidecar_internal.cluster_ops ( cluster_name,
operation_id, operation_type, status, start_time, last_update,
failure_reason, node_execution_order, operation_metadata) VALUES (?, ?, ?, ?,
?, ?, ?, ?, ?)",
+ "SELECT cluster_name, operation_id, operation_type, status,
start_time, last_update, failure_reason, node_execution_order,
operation_metadata FROM sidecar_internal.cluster_ops WHERE cluster_name = ? AND
operation_id = ?",
+ "UPDATE sidecar_internal.cluster_ops SET status = ?, last_update =
? WHERE cluster_name = ? AND operation_id = ? AND operation_type = ?",
+ "UPDATE sidecar_internal.cluster_ops SET status = ?, last_update =
?, start_time = ? WHERE cluster_name = ? AND operation_id = ? AND
operation_type = ?",
+ "UPDATE sidecar_internal.cluster_ops SET status = ?, last_update =
?, failure_reason = ? WHERE cluster_name = ? AND operation_id = ? AND
operation_type = ?",
+ "SELECT cluster_name, operation_id, operation_type, status,
start_time, last_update, failure_reason, node_execution_order,
operation_metadata FROM sidecar_internal.cluster_ops WHERE cluster_name = ?
LIMIT ?",
+
+ "INSERT INTO sidecar_internal.active_cluster_ops (cluster_name,
datacenter, operation_type, operation_id) VALUES (?, ?, ?, ?) IF NOT EXISTS",
+ "SELECT operation_type, operation_id FROM
sidecar_internal.active_cluster_ops WHERE cluster_name = ? AND datacenter = ?",
+ "SELECT operation_id FROM sidecar_internal.active_cluster_ops
WHERE cluster_name = ? AND datacenter = ? AND operation_type = ?",
+ "DELETE FROM sidecar_internal.active_cluster_ops WHERE
cluster_name = ? AND datacenter = ? AND operation_type = ? IF operation_id = ?",
+
+ "INSERT INTO sidecar_internal.cluster_ops_node_state (
cluster_name, operation_id, node_id, node_status) VALUES (?, ?, ?, ?)",
+ "SELECT node_status FROM sidecar_internal.cluster_ops_node_state
WHERE cluster_name = ? AND operation_id = ? AND node_id = ?",
+ "SELECT node_id, node_status FROM
sidecar_internal.cluster_ops_node_state WHERE cluster_name = ? AND operation_id
= ?"
);
assertThat(interceptedPrepStmts).as("Intercepted statements match
expected statements")
@@ -238,6 +263,9 @@ public class SidecarSchemaTest
assertTableSchema(sidecarSchema.tableSchema(SidecarLeaseSchema.class),
"sidecar_internal.sidecar_lease_v1");
assertTableSchema(sidecarSchema.tableSchema(SidecarRolePermissionsSchema.class),
"sidecar_internal.role_permissions_v1");
assertTableSchema(sidecarSchema.tableSchema(SystemAuthSchema.class),
"system_auth");
+
assertTableSchema(sidecarSchema.tableSchema(ClusterOpsSchema.class),
"sidecar_internal.cluster_ops");
+
assertTableSchema(sidecarSchema.tableSchema(ActiveClusterOpsSchema.class),
"sidecar_internal.active_cluster_ops");
+
assertTableSchema(sidecarSchema.tableSchema(ClusterOpsNodeStateSchema.class),
"sidecar_internal.cluster_ops_node_state");
});
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]