This is an automated email from the ASF dual-hosted git repository.

pauloricardomg pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 64bae6bb CASSSIDECAR-373: Define storage provider interface and 
Cassandra implementation to support durable job tracking (#339)
64bae6bb is described below

commit 64bae6bb7e2467b0f61cf88e7f7ed7baf294c78f
Author: Andrés Beck-Ruiz <[email protected]>
AuthorDate: Wed Jun 10 15:31:53 2026 -0400

    CASSSIDECAR-373: Define storage provider interface and Cassandra 
implementation to support durable job tracking (#339)
    
    Patch by Andrés Beck-Ruiz; Reviewed by Isaac Reath, Paulo Motta, Saranya 
Krishnakuma for CASSSIDECAR-373
---
 CHANGES.txt                                        |   1 +
 .../sidecar/common/data/OperationType.java         |  30 +++
 .../config/OperationalJobConfiguration.java        |  32 +++
 .../sidecar/config/SidecarConfiguration.java       |   5 +
 .../yaml/OperationalJobConfigurationImpl.java      |  95 +++++++++
 .../config/yaml/SidecarConfigurationImpl.java      |  17 ++
 .../db/ActiveClusterOpsDatabaseAccessor.java       | 103 +++++++++
 .../sidecar/db/ClusterOpsDatabaseAccessor.java     | 177 ++++++++++++++++
 .../db/ClusterOpsNodeStateDatabaseAccessor.java    | 120 +++++++++++
 .../sidecar/db/schema/ActiveClusterOpsSchema.java  | 139 +++++++++++++
 .../db/schema/ClusterOpsNodeStateSchema.java       | 132 ++++++++++++
 .../sidecar/db/schema/ClusterOpsSchema.java        | 198 ++++++++++++++++++
 .../job/storage/CassandraStorageProvider.java      | 230 +++++++++++++++++++++
 .../sidecar/job/storage/OperationalJobRecord.java  | 191 +++++++++++++++++
 .../sidecar/job/storage/StorageProvider.java       | 204 ++++++++++++++++++
 .../job/storage/StorageProviderException.java      |  52 +++++
 .../sidecar/modules/OperationalJobsModule.java     |  79 +++++++
 .../cassandra/sidecar/modules/SidecarModules.java  |   1 +
 .../modules/multibindings/TableSchemaMapKeys.java  |   3 +
 ...eClusterOpsDatabaseAccessorIntegrationTest.java | 111 ++++++++++
 .../ClusterOpsDatabaseAccessorIntegrationTest.java | 160 ++++++++++++++
 ...psNodeStateDatabaseAccessorIntegrationTest.java | 121 +++++++++++
 .../yaml/OperationalJobConfigurationImplTest.java  |  49 +++++
 .../cassandra/sidecar/db/SidecarSchemaTest.java    |  32 ++-
 24 files changed, 2280 insertions(+), 2 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 7bb99b4e..a26611c5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 0.5.0
 -----
+ * Define storage provider interface and Cassandra implementation to support 
durable job tracking (CASSSIDECAR-373)
  * Validate snapshot name during list snapshot (CASSSIDECAR-461)
 
 0.4.0
diff --git 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/data/OperationType.java
 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/data/OperationType.java
new file mode 100644
index 00000000..479fbc7b
--- /dev/null
+++ 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/data/OperationType.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.data;
+
+/**
+ * Enumerates the types of operational jobs that can be performed on a 
Cassandra cluster.
+ */
+public enum OperationType
+{
+    DECOMMISSION,
+    DRAIN,
+    MOVE,
+    REPAIR
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/config/OperationalJobConfiguration.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/config/OperationalJobConfiguration.java
new file mode 100644
index 00000000..9c20d5ab
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/config/OperationalJobConfiguration.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.config;
+
+import 
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
+
+/**
+ * Configuration for operational jobs managed by Sidecar
+ */
+public interface OperationalJobConfiguration
+{
+    /**
+     * @return the time-to-live for operational job tables
+     */
+    SecondBoundConfiguration tablesTtl();
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/config/SidecarConfiguration.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/config/SidecarConfiguration.java
index 82f6cda3..8a9fab23 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/config/SidecarConfiguration.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/config/SidecarConfiguration.java
@@ -120,4 +120,9 @@ public interface SidecarConfiguration
      * @return the configuration for lifecycle management
      */
     LifecycleConfiguration lifecycleConfiguration();
+
+    /**
+     * @return the configuration for operational jobs
+     */
+    OperationalJobConfiguration operationalJobConfiguration();
 }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/OperationalJobConfigurationImpl.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/OperationalJobConfigurationImpl.java
new file mode 100644
index 00000000..8cc1ca42
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/OperationalJobConfigurationImpl.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.config.yaml;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.cassandra.sidecar.common.DataObjectBuilder;
+import 
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
+import org.apache.cassandra.sidecar.config.OperationalJobConfiguration;
+
+/**
+ * Configuration for operational jobs managed by Sidecar
+ */
+public class OperationalJobConfigurationImpl implements 
OperationalJobConfiguration
+{
+    private static final SecondBoundConfiguration DEFAULT_TABLES_TTL = 
SecondBoundConfiguration.parse("90d");
+    private static final SecondBoundConfiguration MIN_TABLES_TTL = 
SecondBoundConfiguration.parse("14d");
+
+    protected SecondBoundConfiguration tablesTtl;
+
+    public OperationalJobConfigurationImpl()
+    {
+        this(builder());
+    }
+
+    protected OperationalJobConfigurationImpl(Builder builder)
+    {
+        this.tablesTtl = builder.tablesTtl;
+        validate();
+    }
+
+    private void validate()
+    {
+        if (tablesTtl.compareTo(MIN_TABLES_TTL) < 0)
+        {
+            throw new IllegalArgumentException("tablesTtl cannot be less than 
" + MIN_TABLES_TTL);
+        }
+    }
+
+    @Override
+    @JsonProperty(value = "tables_ttl")
+    public SecondBoundConfiguration tablesTtl()
+    {
+        return tablesTtl;
+    }
+
+    public static Builder builder()
+    {
+        return new Builder();
+    }
+
+    /**
+     * {@code OperationalJobConfigurationImpl} builder static inner class.
+     */
+    public static class Builder implements DataObjectBuilder<Builder, 
OperationalJobConfigurationImpl>
+    {
+        private SecondBoundConfiguration tablesTtl = DEFAULT_TABLES_TTL;
+
+        protected Builder()
+        {
+        }
+
+        @Override
+        public Builder self()
+        {
+            return this;
+        }
+
+        public Builder tablesTtl(SecondBoundConfiguration tablesTtl)
+        {
+            return update(b -> b.tablesTtl = tablesTtl);
+        }
+
+        @Override
+        public OperationalJobConfigurationImpl build()
+        {
+            return new OperationalJobConfigurationImpl(this);
+        }
+    }
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/SidecarConfigurationImpl.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/SidecarConfigurationImpl.java
index ef364cf1..58781bd4 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/SidecarConfigurationImpl.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/SidecarConfigurationImpl.java
@@ -45,6 +45,7 @@ import 
org.apache.cassandra.sidecar.config.InstanceConfiguration;
 import org.apache.cassandra.sidecar.config.LifecycleConfiguration;
 import org.apache.cassandra.sidecar.config.LiveMigrationConfiguration;
 import org.apache.cassandra.sidecar.config.MetricsConfiguration;
+import org.apache.cassandra.sidecar.config.OperationalJobConfiguration;
 import org.apache.cassandra.sidecar.config.PeriodicTaskConfiguration;
 import org.apache.cassandra.sidecar.config.RestoreJobConfiguration;
 import org.apache.cassandra.sidecar.config.S3ClientConfiguration;
@@ -122,6 +123,9 @@ public class SidecarConfigurationImpl implements 
SidecarConfiguration
     @JsonProperty("lifecycle")
     private LifecycleConfiguration lifecycleConfiguration;
 
+    @JsonProperty("operational_job")
+    private OperationalJobConfiguration operationalJobConfiguration;
+
     public SidecarConfigurationImpl()
     {
         this(builder());
@@ -147,6 +151,7 @@ public class SidecarConfigurationImpl implements 
SidecarConfiguration
         schemaReportingConfiguration = builder.schemaReportingConfiguration;
         liveMigrationConfiguration = builder.liveMigrationConfiguration;
         lifecycleConfiguration = builder.lifecycleConfiguration;
+        operationalJobConfiguration = builder.operationalJobConfiguration;
     }
 
     /**
@@ -327,6 +332,12 @@ public class SidecarConfigurationImpl implements 
SidecarConfiguration
         return lifecycleConfiguration;
     }
 
+    @Override
+    @JsonProperty("operational_job")
+    public OperationalJobConfiguration operationalJobConfiguration()
+    {
+        return operationalJobConfiguration;
+    }
 
     public static SidecarConfigurationImpl readYamlConfiguration(Path 
yamlConfigurationPath) throws IOException
     {
@@ -444,6 +455,7 @@ public class SidecarConfigurationImpl implements 
SidecarConfiguration
         private SchemaReportingConfiguration schemaReportingConfiguration = 
new SchemaReportingConfigurationImpl();
         private LiveMigrationConfiguration liveMigrationConfiguration = new 
LiveMigrationConfigurationImpl();
         private LifecycleConfiguration lifecycleConfiguration = new 
LifecycleConfigurationImpl();
+        private OperationalJobConfiguration operationalJobConfiguration = new 
OperationalJobConfigurationImpl();
 
         protected Builder()
         {
@@ -659,6 +671,11 @@ public class SidecarConfigurationImpl implements 
SidecarConfiguration
             return update(b -> b.lifecycleConfiguration = 
lifecycleConfiguration);
         }
 
+        public Builder operationalJobConfiguration(OperationalJobConfiguration 
operationalJobConfiguration)
+        {
+            return update(b -> b.operationalJobConfiguration = 
operationalJobConfiguration);
+        }
+
         /**
          * Returns a {@code SidecarConfigurationImpl} built from the 
parameters previously set.
          *
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/db/ActiveClusterOpsDatabaseAccessor.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/db/ActiveClusterOpsDatabaseAccessor.java
new file mode 100644
index 00000000..0c48138d
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/db/ActiveClusterOpsDatabaseAccessor.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.db;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.cassandra.sidecar.common.data.OperationType;
+import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
+import org.apache.cassandra.sidecar.db.schema.ActiveClusterOpsSchema;
+import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.VisibleForTesting;
+
+/**
+ * Database accessor for the {@code active_cluster_ops} table.
+ * Provides mutual exclusion for active operations via lightweight 
transactions (LWT).
+ */
+@Singleton
+public class ActiveClusterOpsDatabaseAccessor extends 
DatabaseAccessor<ActiveClusterOpsSchema>
+{
+    @Inject
+    public ActiveClusterOpsDatabaseAccessor(SidecarSchema sidecarSchema, 
CQLSessionProvider sessionProvider)
+    {
+        this(sidecarSchema.tableSchema(ActiveClusterOpsSchema.class), 
sessionProvider);
+    }
+
+    @VisibleForTesting
+    public ActiveClusterOpsDatabaseAccessor(ActiveClusterOpsSchema schema, 
CQLSessionProvider sessionProvider)
+    {
+        super(schema, sessionProvider);
+    }
+
+    public boolean trySetActiveOperation(String clusterName, String datacenter,
+                                         OperationType operationType, UUID 
operationId)
+    {
+        BoundStatement statement = tableSchema.trySetActive()
+                                              .bind(clusterName, datacenter, 
operationType.name(), operationId);
+        statement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
+        statement.setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL);
+        ResultSet resultSet = execute(statement);
+        return resultSet.wasApplied();
+    }
+
+    @Nullable
+    public UUID getActiveOperation(String clusterName, String datacenter, 
OperationType operationType)
+    {
+        BoundStatement statement = 
tableSchema.getActiveByType().bind(clusterName, datacenter, 
operationType.name());
+        statement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
+        ResultSet resultSet = execute(statement);
+        Row row = resultSet.one();
+        return row == null ? null : row.getUUID("operation_id");
+    }
+
+    @NotNull
+    public Map<OperationType, UUID> getActiveOperations(String clusterName, 
String datacenter)
+    {
+        BoundStatement statement = tableSchema.getActive().bind(clusterName, 
datacenter);
+        statement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
+        ResultSet resultSet = execute(statement);
+        Map<OperationType, UUID> activeOps = new HashMap<>();
+        for (Row row : resultSet)
+        {
+            
activeOps.put(OperationType.valueOf(row.getString("operation_type")), 
row.getUUID("operation_id"));
+        }
+        return activeOps;
+    }
+
+    public boolean clearActiveOperation(String clusterName, String datacenter,
+                                        OperationType operationType, UUID 
operationId)
+    {
+        BoundStatement statement = tableSchema.clearActive()
+                                              .bind(clusterName, datacenter, 
operationType.name(), operationId);
+        statement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
+        statement.setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL);
+        ResultSet resultSet = execute(statement);
+        return resultSet.wasApplied();
+    }
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/db/ClusterOpsDatabaseAccessor.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/db/ClusterOpsDatabaseAccessor.java
new file mode 100644
index 00000000..eea6f769
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/db/ClusterOpsDatabaseAccessor.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.db;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import com.google.common.reflect.TypeToken;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.cassandra.sidecar.common.data.OperationType;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
+import org.apache.cassandra.sidecar.db.schema.ClusterOpsSchema;
+import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
+import org.apache.cassandra.sidecar.job.storage.OperationalJobRecord;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.VisibleForTesting;
+
+/**
+ * Database accessor for the {@code cluster_ops} table.
+ */
+@Singleton
+public class ClusterOpsDatabaseAccessor extends 
DatabaseAccessor<ClusterOpsSchema>
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ClusterOpsDatabaseAccessor.class);
+    private static final TypeToken<List<List<UUID>>> NODES_ORDER_TYPE = new 
TypeToken<List<List<UUID>>>() {};
+
+    @Inject
+    public ClusterOpsDatabaseAccessor(SidecarSchema sidecarSchema, 
CQLSessionProvider sessionProvider)
+    {
+        this(sidecarSchema.tableSchema(ClusterOpsSchema.class), 
sessionProvider);
+    }
+
+    @VisibleForTesting
+    public ClusterOpsDatabaseAccessor(ClusterOpsSchema schema, 
CQLSessionProvider sessionProvider)
+    {
+        super(schema, sessionProvider);
+    }
+
+    public void persistJob(String clusterName, OperationalJobRecord job)
+    {
+        Date lastUpdate = Date.from(Instant.now());
+        Date startTime = job.startTime() != null ? Date.from(job.startTime()) 
: null;
+        BoundStatement statement = tableSchema.insertJob()
+                                              .bind(clusterName,
+                                                    job.jobId(),
+                                                    job.operationType().name(),
+                                                    job.status().name(),
+                                                    startTime,
+                                                    lastUpdate,
+                                                    job.failureReason(),
+                                                    job.nodeExecutionOrder(),
+                                                    job.operationMetadata());
+        statement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
+        execute(statement);
+    }
+
+    @Nullable
+    public OperationalJobRecord findJob(String clusterName, UUID jobId)
+    {
+        BoundStatement statement = tableSchema.selectJob().bind(clusterName, 
jobId);
+        statement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
+        ResultSet resultSet = execute(statement);
+        Row row = resultSet.one();
+        if (row == null)
+        {
+            return null;
+        }
+        if (!resultSet.isExhausted())
+        {
+            LOGGER.warn("Multiple rows found for operation_id={} in 
cluster={}. Using first match.", jobId, clusterName);
+        }
+        return recordFromRow(row);
+    }
+
+    public void updateJobStatus(String clusterName, UUID jobId, OperationType 
operationType,
+                                OperationalJobStatus status, @Nullable String 
failureReason)
+    {
+        Date lastUpdate = Date.from(Instant.now());
+        BoundStatement statement;
+        if (failureReason != null)
+        {
+            statement = tableSchema.updateStatusWithFailure()
+                                   .bind(status.name(), lastUpdate, 
failureReason,
+                                         clusterName, jobId, 
operationType.name());
+        }
+        else if (status == OperationalJobStatus.RUNNING && 
shouldSetStartTime(clusterName, jobId))
+        {
+            statement = tableSchema.updateStatusWithStartTime()
+                                   .bind(status.name(), lastUpdate, lastUpdate,
+                                         clusterName, jobId, 
operationType.name());
+        }
+        else
+        {
+            statement = tableSchema.updateStatus()
+                                   .bind(status.name(), lastUpdate,
+                                         clusterName, jobId, 
operationType.name());
+        }
+        statement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
+        execute(statement);
+    }
+
+    private boolean shouldSetStartTime(String clusterName, UUID jobId)
+    {
+        OperationalJobRecord existing = findJob(clusterName, jobId);
+        return existing != null && existing.startTime() == null;
+    }
+
+    @NotNull
+    public List<OperationalJobRecord> findAllJobs(String clusterName, int 
limit)
+    {
+        BoundStatement statement = tableSchema.findAllJobs().bind(clusterName, 
limit);
+        statement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
+        ResultSet resultSet = execute(statement);
+        List<OperationalJobRecord> records = new ArrayList<>();
+        for (Row row : resultSet)
+        {
+            records.add(recordFromRow(row));
+        }
+        return records;
+    }
+
+    private OperationalJobRecord recordFromRow(Row row)
+    {
+        UUID operationId = row.getUUID("operation_id");
+        OperationType operationType = 
OperationType.valueOf(row.getString("operation_type"));
+        OperationalJobStatus status = 
OperationalJobStatus.valueOf(row.getString("status"));
+        Date startTimeDate = row.getTimestamp("start_time");
+        Instant startTime = startTimeDate != null ? startTimeDate.toInstant() 
: null;
+        Date lastUpdateDate = row.getTimestamp("last_update");
+        Instant lastUpdate = lastUpdateDate != null ? 
lastUpdateDate.toInstant() : null;
+        String failureReason = row.getString("failure_reason");
+        List<List<UUID>> nodeExecutionOrder = row.get("node_execution_order", 
NODES_ORDER_TYPE);
+        if (nodeExecutionOrder != null && nodeExecutionOrder.isEmpty())
+        {
+            nodeExecutionOrder = null;
+        }
+        Map<String, String> operationMetadata = 
row.getMap("operation_metadata", String.class, String.class);
+        if (operationMetadata.isEmpty())
+        {
+            operationMetadata = null;
+        }
+        return new OperationalJobRecord(operationId, operationType, status,
+                                        startTime, lastUpdate, failureReason,
+                                        nodeExecutionOrder, operationMetadata);
+    }
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/db/ClusterOpsNodeStateDatabaseAccessor.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/db/ClusterOpsNodeStateDatabaseAccessor.java
new file mode 100644
index 00000000..fee68e91
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/db/ClusterOpsNodeStateDatabaseAccessor.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.db;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
+import org.apache.cassandra.sidecar.db.schema.ClusterOpsNodeStateSchema;
+import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.VisibleForTesting;
+
+/**
+ * Database accessor for the {@code cluster_ops_node_state} table.
+ */
+@Singleton
+public class ClusterOpsNodeStateDatabaseAccessor extends 
DatabaseAccessor<ClusterOpsNodeStateSchema>
+{
+    private static final int DEFAULT_BATCH_CHUNK_SIZE = 100;
+
+    private final int batchChunkSize;
+
+    @Inject
+    public ClusterOpsNodeStateDatabaseAccessor(SidecarSchema sidecarSchema, 
CQLSessionProvider sessionProvider)
+    {
+        this(sidecarSchema.tableSchema(ClusterOpsNodeStateSchema.class), 
sessionProvider, DEFAULT_BATCH_CHUNK_SIZE);
+    }
+
+    @VisibleForTesting
+    public ClusterOpsNodeStateDatabaseAccessor(ClusterOpsNodeStateSchema 
schema, CQLSessionProvider sessionProvider,
+                                               int batchChunkSize)
+    {
+        super(schema, sessionProvider);
+        this.batchChunkSize = batchChunkSize;
+    }
+
+    public void updateNodeStatus(String clusterName, UUID operationId, UUID 
nodeId, OperationalJobStatus nodeStatus)
+    {
+        BoundStatement statement = tableSchema.insertNodeStatus()
+                                              .bind(clusterName, operationId, 
nodeId, nodeStatus.name());
+        statement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
+        execute(statement);
+    }
+
+    public void updateNodeStatuses(String clusterName, UUID operationId,
+                                   List<UUID> nodeIds, OperationalJobStatus 
nodeStatus)
+    {
+        for (int i = 0; i < nodeIds.size(); i += batchChunkSize)
+        {
+            List<UUID> chunk = nodeIds.subList(i, Math.min(i + batchChunkSize, 
nodeIds.size()));
+            BatchStatement batch = new 
BatchStatement(BatchStatement.Type.UNLOGGED);
+            for (UUID nodeId : chunk)
+            {
+                BoundStatement statement = tableSchema.insertNodeStatus()
+                                                      .bind(clusterName, 
operationId, nodeId, nodeStatus.name());
+                batch.add(statement);
+            }
+            batch.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
+            execute(batch);
+        }
+    }
+
+    @Nullable
+    public OperationalJobStatus getNodeStatus(String clusterName, UUID 
operationId, UUID nodeId)
+    {
+        BoundStatement statement = 
tableSchema.selectNodeStatus().bind(clusterName, operationId, nodeId);
+        statement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
+        ResultSet resultSet = execute(statement);
+        Row row = resultSet.one();
+        if (row == null)
+        {
+            return null;
+        }
+        return OperationalJobStatus.valueOf(row.getString("node_status"));
+    }
+
+    @NotNull
+    public Map<UUID, OperationalJobStatus> getNodeStatusesForOperation(String 
clusterName, UUID operationId)
+    {
+        BoundStatement statement = 
tableSchema.selectAllNodeStatuses().bind(clusterName, operationId);
+        statement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
+        ResultSet resultSet = execute(statement);
+        Map<UUID, OperationalJobStatus> statuses = new HashMap<>();
+        for (Row row : resultSet)
+        {
+            UUID nodeId = row.getUUID("node_id");
+            OperationalJobStatus status = 
OperationalJobStatus.valueOf(row.getString("node_status"));
+            statuses.put(nodeId, status);
+        }
+        return statuses;
+    }
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/db/schema/ActiveClusterOpsSchema.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/db/schema/ActiveClusterOpsSchema.java
new file mode 100644
index 00000000..7b0b6bef
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/db/schema/ActiveClusterOpsSchema.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.db.schema;
+
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import 
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
+import org.apache.cassandra.sidecar.config.SchemaKeyspaceConfiguration;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Schema for the {@code active_cluster_ops} table, which ensures that only 
one active operation of a given type
+ * is running at a time across the cluster. When a Sidecar instance receives a 
request to set an operation to
+ * {@code RUNNING}, it inserts the operation into this table using a 
lightweight transaction (LWT) to guarantee 
+ * mutual exclusion. Sidecar instances also periodically query this table to 
discover when an operation has been 
+ * activated.
+ */
+public class ActiveClusterOpsSchema extends TableSchema
+{
+    private static final String TABLE_NAME = "active_cluster_ops";
+
+    private final SchemaKeyspaceConfiguration keyspaceConfig;
+    private final SecondBoundConfiguration tableTtl;
+
+    private PreparedStatement trySetActive;
+    private PreparedStatement getActive;
+    private PreparedStatement getActiveByType;
+    private PreparedStatement clearActive;
+
+    public ActiveClusterOpsSchema(SchemaKeyspaceConfiguration keyspaceConfig, 
SecondBoundConfiguration tableTtl)
+    {
+        this.keyspaceConfig = keyspaceConfig;
+        this.tableTtl = tableTtl;
+    }
+
+    @Override
+    protected String keyspaceName()
+    {
+        return keyspaceConfig.keyspace();
+    }
+
+    @Override
+    protected String tableName()
+    {
+        return TABLE_NAME;
+    }
+
+    @Override
+    protected String createSchemaStatement()
+    {
+        return String.format("CREATE TABLE IF NOT EXISTS %s.%s (" +
+                             "  cluster_name text," +
+                             "  datacenter text," +
+                             "  operation_type text," +
+                             "  operation_id timeuuid," +
+                             "  PRIMARY KEY ((cluster_name, datacenter), 
operation_type)" +
+                             ") WITH compaction = {'class': 
'LeveledCompactionStrategy'}" +
+                             "  AND default_time_to_live = %s",
+                             keyspaceConfig.keyspace(), TABLE_NAME, 
tableTtl.toSeconds());
+    }
+
+    @Override
+    protected void prepareStatements(@NotNull Session session)
+    {
+        trySetActive = prepare(trySetActive, session, 
CqlLiterals.trySetActive(keyspaceConfig));
+        getActive = prepare(getActive, session, 
CqlLiterals.getActive(keyspaceConfig));
+        getActiveByType = prepare(getActiveByType, session, 
CqlLiterals.getActiveByType(keyspaceConfig));
+        clearActive = prepare(clearActive, session, 
CqlLiterals.clearActive(keyspaceConfig));
+    }
+
+    public PreparedStatement trySetActive()
+    {
+        return trySetActive;
+    }
+
+    public PreparedStatement getActive()
+    {
+        return getActive;
+    }
+
+    public PreparedStatement getActiveByType()
+    {
+        return getActiveByType;
+    }
+
+    public PreparedStatement clearActive()
+    {
+        return clearActive;
+    }
+
+    private static class CqlLiterals
+    {
+        static String trySetActive(SchemaKeyspaceConfiguration config)
+        {
+            return withTable("INSERT INTO %s.%s (cluster_name, datacenter, 
operation_type, operation_id) " +
+                             "VALUES (?, ?, ?, ?) IF NOT EXISTS", config);
+        }
+
+        static String getActive(SchemaKeyspaceConfiguration config)
+        {
+            return withTable("SELECT operation_type, operation_id FROM %s.%s " 
+
+                             "WHERE cluster_name = ? AND datacenter = ?", 
config);
+        }
+
+        static String getActiveByType(SchemaKeyspaceConfiguration config)
+        {
+            return withTable("SELECT operation_id FROM %s.%s " +
+                             "WHERE cluster_name = ? AND datacenter = ? AND 
operation_type = ?", config);
+        }
+
+        static String clearActive(SchemaKeyspaceConfiguration config)
+        {
+            return withTable("DELETE FROM %s.%s " +
+                             "WHERE cluster_name = ? AND datacenter = ? AND 
operation_type = ? " +
+                             "IF operation_id = ?", config);
+        }
+
+        private static String withTable(String format, 
SchemaKeyspaceConfiguration config)
+        {
+            return String.format(format, config.keyspace(), TABLE_NAME);
+        }
+    }
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/db/schema/ClusterOpsNodeStateSchema.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/db/schema/ClusterOpsNodeStateSchema.java
new file mode 100644
index 00000000..5f7aa0b5
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/db/schema/ClusterOpsNodeStateSchema.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.db.schema;
+
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import 
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
+import org.apache.cassandra.sidecar.config.SchemaKeyspaceConfiguration;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Schema for the {@code cluster_ops_node_state} table, which tracks the 
status of an operation for a given node.
+ * Sidecar instances query this table to check the status of nodes being 
operated on before their local nodes,
+ * enabling distributed coordination of cluster-wide operations.
+ * <p>
+ * The {@code cluster_name} partition key identifies the cluster being 
operated on, allowing a single
+ * Cassandra cluster to store operational job state for multiple managed 
clusters.
+ */
+public class ClusterOpsNodeStateSchema extends TableSchema
+{
+    private static final String TABLE_NAME = "cluster_ops_node_state";
+
+    private final SchemaKeyspaceConfiguration keyspaceConfig;
+    private final SecondBoundConfiguration tableTtl;
+
+    private PreparedStatement insertNodeStatus;
+    private PreparedStatement selectNodeStatus;
+    private PreparedStatement selectAllNodeStatuses;
+
+    public ClusterOpsNodeStateSchema(SchemaKeyspaceConfiguration 
keyspaceConfig, SecondBoundConfiguration tableTtl)
+    {
+        this.keyspaceConfig = keyspaceConfig;
+        this.tableTtl = tableTtl;
+    }
+
+    @Override
+    protected String keyspaceName()
+    {
+        return keyspaceConfig.keyspace();
+    }
+
+    @Override
+    protected String tableName()
+    {
+        return TABLE_NAME;
+    }
+
+    @Override
+    protected String createSchemaStatement()
+    {
+        return String.format("CREATE TABLE IF NOT EXISTS %s.%s (" +
+                             "  cluster_name text," +
+                             "  operation_id timeuuid," +
+                             "  node_id uuid," +
+                             "  node_status text," +
+                             "  PRIMARY KEY ((cluster_name, operation_id), 
node_id)" +
+                             ") WITH compaction = {'class': 
'LeveledCompactionStrategy'}" +
+                             "  AND default_time_to_live = %s",
+                             keyspaceConfig.keyspace(), TABLE_NAME, 
tableTtl.toSeconds());
+    }
+
+    @Override
+    protected void prepareStatements(@NotNull Session session)
+    {
+        insertNodeStatus = prepare(insertNodeStatus, session, 
CqlLiterals.insertNodeStatus(keyspaceConfig));
+        selectNodeStatus = prepare(selectNodeStatus, session, 
CqlLiterals.selectNodeStatus(keyspaceConfig));
+        selectAllNodeStatuses = prepare(selectAllNodeStatuses, session, 
CqlLiterals.selectAllNodeStatuses(keyspaceConfig));
+    }
+
+    public PreparedStatement insertNodeStatus()
+    {
+        return insertNodeStatus;
+    }
+
+    public PreparedStatement selectNodeStatus()
+    {
+        return selectNodeStatus;
+    }
+
+    public PreparedStatement selectAllNodeStatuses()
+    {
+        return selectAllNodeStatuses;
+    }
+
+    private static class CqlLiterals
+    {
+        static String insertNodeStatus(SchemaKeyspaceConfiguration config)
+        {
+            return withTable("INSERT INTO %s.%s (" +
+                             "  cluster_name," +
+                             "  operation_id," +
+                             "  node_id," +
+                             "  node_status" +
+                             ") VALUES (?, ?, ?, ?)", config);
+        }
+
+        static String selectNodeStatus(SchemaKeyspaceConfiguration config)
+        {
+            return withTable("SELECT node_status " +
+                             "FROM %s.%s " +
+                             "WHERE cluster_name = ? AND operation_id = ? AND 
node_id = ?", config);
+        }
+
+        static String selectAllNodeStatuses(SchemaKeyspaceConfiguration config)
+        {
+            return withTable("SELECT node_id, node_status " +
+                             "FROM %s.%s " +
+                             "WHERE cluster_name = ? AND operation_id = ?", 
config);
+        }
+
+        private static String withTable(String format, 
SchemaKeyspaceConfiguration config)
+        {
+            return String.format(format, config.keyspace(), TABLE_NAME);
+        }
+    }
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/db/schema/ClusterOpsSchema.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/db/schema/ClusterOpsSchema.java
new file mode 100644
index 00000000..46f2b237
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/db/schema/ClusterOpsSchema.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.db.schema;
+
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import 
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
+import org.apache.cassandra.sidecar.config.SchemaKeyspaceConfiguration;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Schema for the {@code cluster_ops} table, which persists and tracks active 
and past operational jobs.
+ * When durable operational job storage is used, all operational job APIs 
query from this table to retrieve
+ * job state, including the operation type, status, node execution order, and 
operation metadata.
+ * <p>
+ * The {@code cluster_name} partition key identifies the cluster being 
operated on, allowing a single
+ * Cassandra cluster to store operational job state for multiple managed 
clusters.
+ */
+public class ClusterOpsSchema extends TableSchema
+{
+    private static final String TABLE_NAME = "cluster_ops";
+
+    private final SchemaKeyspaceConfiguration keyspaceConfig;
+    private final SecondBoundConfiguration tableTtl;
+
+    private PreparedStatement insertJob;
+    private PreparedStatement selectJob;
+    private PreparedStatement updateStatus;
+    private PreparedStatement updateStatusWithStartTime;
+    private PreparedStatement updateStatusWithFailure;
+    private PreparedStatement findAllJobs;
+
+    public ClusterOpsSchema(SchemaKeyspaceConfiguration keyspaceConfig, 
SecondBoundConfiguration tableTtl)
+    {
+        this.keyspaceConfig = keyspaceConfig;
+        this.tableTtl = tableTtl;
+    }
+
+    @Override
+    protected String keyspaceName()
+    {
+        return keyspaceConfig.keyspace();
+    }
+
+    @Override
+    protected String tableName()
+    {
+        return TABLE_NAME;
+    }
+
+    @Override
+    protected String createSchemaStatement()
+    {
+        return String.format("CREATE TABLE IF NOT EXISTS %s.%s (" +
+                             "  cluster_name text," +
+                             "  operation_id timeuuid," +
+                             "  operation_type text," +
+                             "  status text," +
+                             "  start_time timestamp," +
+                             "  last_update timestamp," +
+                             "  failure_reason text," +
+                             "  node_execution_order 
frozen<list<frozen<list<uuid>>>>," +
+                             "  operation_metadata frozen<map<text, text>>," +
+                             "  PRIMARY KEY ((cluster_name), operation_id, 
operation_type)" +
+                             ") WITH CLUSTERING ORDER BY (operation_id DESC, 
operation_type ASC)" +
+                             "  AND compaction = {'class': 
'LeveledCompactionStrategy'}" +
+                             "  AND default_time_to_live = %s",
+                             keyspaceConfig.keyspace(), TABLE_NAME, 
tableTtl.toSeconds());
+    }
+
+    @Override
+    protected void prepareStatements(@NotNull Session session)
+    {
+        insertJob = prepare(insertJob, session, 
CqlLiterals.insertJob(keyspaceConfig));
+        selectJob = prepare(selectJob, session, 
CqlLiterals.selectJob(keyspaceConfig));
+        updateStatus = prepare(updateStatus, session, 
CqlLiterals.updateStatus(keyspaceConfig));
+        updateStatusWithStartTime = prepare(updateStatusWithStartTime, 
session, CqlLiterals.updateStatusWithStartTime(keyspaceConfig));
+        updateStatusWithFailure = prepare(updateStatusWithFailure, session, 
CqlLiterals.updateStatusWithFailure(keyspaceConfig));
+        findAllJobs = prepare(findAllJobs, session, 
CqlLiterals.findAllJobs(keyspaceConfig));
+    }
+
+    public PreparedStatement insertJob()
+    {
+        return insertJob;
+    }
+
+    public PreparedStatement selectJob()
+    {
+        return selectJob;
+    }
+
+    public PreparedStatement updateStatus()
+    {
+        return updateStatus;
+    }
+
+    public PreparedStatement updateStatusWithStartTime()
+    {
+        return updateStatusWithStartTime;
+    }
+
+    public PreparedStatement updateStatusWithFailure()
+    {
+        return updateStatusWithFailure;
+    }
+
+    public PreparedStatement findAllJobs()
+    {
+        return findAllJobs;
+    }
+
+    private static class CqlLiterals
+    {
+        static String insertJob(SchemaKeyspaceConfiguration config)
+        {
+            return withTable("INSERT INTO %s.%s (" +
+                             "  cluster_name," +
+                             "  operation_id," +
+                             "  operation_type," +
+                             "  status," +
+                             "  start_time," +
+                             "  last_update," +
+                             "  failure_reason," +
+                             "  node_execution_order," +
+                             "  operation_metadata" +
+                             ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", config);
+        }
+
+        static String selectJob(SchemaKeyspaceConfiguration config)
+        {
+            return withTable("SELECT cluster_name, " +
+                             "operation_id, " +
+                             "operation_type, " +
+                             "status, " +
+                             "start_time, " +
+                             "last_update, " +
+                             "failure_reason, " +
+                             "node_execution_order, " +
+                             "operation_metadata " +
+                             "FROM %s.%s " +
+                             "WHERE cluster_name = ? AND operation_id = ?", 
config);
+        }
+
+        static String updateStatus(SchemaKeyspaceConfiguration config)
+        {
+            return withTable("UPDATE %s.%s SET status = ?, last_update = ? " +
+                             "WHERE cluster_name = ? AND operation_id = ? AND 
operation_type = ?", config);
+        }
+
+        static String updateStatusWithStartTime(SchemaKeyspaceConfiguration 
config)
+        {
+            return withTable("UPDATE %s.%s SET status = ?, last_update = ?, 
start_time = ? " +
+                             "WHERE cluster_name = ? AND operation_id = ? AND 
operation_type = ?", config);
+        }
+
+        static String updateStatusWithFailure(SchemaKeyspaceConfiguration 
config)
+        {
+            return withTable("UPDATE %s.%s SET status = ?, last_update = ?, 
failure_reason = ? " +
+                             "WHERE cluster_name = ? AND operation_id = ? AND 
operation_type = ?", config);
+        }
+
+        static String findAllJobs(SchemaKeyspaceConfiguration config)
+        {
+            return withTable("SELECT cluster_name, " +
+                             "operation_id, " +
+                             "operation_type, " +
+                             "status, " +
+                             "start_time, " +
+                             "last_update, " +
+                             "failure_reason, " +
+                             "node_execution_order, " +
+                             "operation_metadata " +
+                             "FROM %s.%s " +
+                             "WHERE cluster_name = ? LIMIT ?", config);
+        }
+
+        private static String withTable(String format, 
SchemaKeyspaceConfiguration config)
+        {
+            return String.format(format, config.keyspace(), TABLE_NAME);
+        }
+    }
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/job/storage/CassandraStorageProvider.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/job/storage/CassandraStorageProvider.java
new file mode 100644
index 00000000..47781845
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/job/storage/CassandraStorageProvider.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.job.storage;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Supplier;
+
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.DriverException;
+import org.apache.cassandra.sidecar.common.data.OperationType;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
+import org.apache.cassandra.sidecar.db.ActiveClusterOpsDatabaseAccessor;
+import org.apache.cassandra.sidecar.db.ClusterOpsDatabaseAccessor;
+import org.apache.cassandra.sidecar.db.ClusterOpsNodeStateDatabaseAccessor;
+import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * A {@link StorageProvider} implementation backed by Cassandra system tables.
+ * Delegates to three database accessors for cluster operations, node state, 
and active operation coordination.
+ * Each instance is scoped to a single cluster identified by {@code 
clusterName}.
+ */
+public class CassandraStorageProvider implements StorageProvider
+{
+    private final CQLSessionProvider sessionProvider;
+    private final ClusterOpsDatabaseAccessor clusterOpsAccessor;
+    private final ClusterOpsNodeStateDatabaseAccessor nodeStateAccessor;
+    private final ActiveClusterOpsDatabaseAccessor activeOpsAccessor;
+    private volatile String clusterName;
+    private volatile String datacenter;
+
+    /**
+     * Constructs a CassandraStorageProvider. When {@code clusterName} is 
non-null, it indicates that
+     * operational state is stored in a separate Cassandra cluster from the 
one Sidecar manages. When null,
+     * the cluster name is derived from the CQL session metadata during {@link 
#initialize()}, which is the
+     * default for same-cluster storage.
+     */
+    public CassandraStorageProvider(CQLSessionProvider sessionProvider,
+                                    ClusterOpsDatabaseAccessor 
clusterOpsAccessor,
+                                    ClusterOpsNodeStateDatabaseAccessor 
nodeStateAccessor,
+                                    ActiveClusterOpsDatabaseAccessor 
activeOpsAccessor,
+                                    @Nullable String clusterName)
+    {
+        this.sessionProvider = sessionProvider;
+        this.clusterOpsAccessor = clusterOpsAccessor;
+        this.nodeStateAccessor = nodeStateAccessor;
+        this.activeOpsAccessor = activeOpsAccessor;
+        this.clusterName = clusterName;
+    }
+
+    @Override
+    public void persistJob(OperationalJobRecord job)
+    {
+        execute("persistJob", () -> {
+            clusterOpsAccessor.persistJob(clusterName, job);
+            return null;
+        });
+    }
+
+    @Override
+    @Nullable
+    public OperationalJobRecord findJob(UUID jobId)
+    {
+        return execute("findJob", () -> 
clusterOpsAccessor.findJob(clusterName, jobId));
+    }
+
+    @Override
+    public void updateJobStatus(UUID jobId, OperationType operationType, 
OperationalJobStatus status,
+                                @Nullable String failureReason)
+    {
+        execute("updateJobStatus", () -> {
+            clusterOpsAccessor.updateJobStatus(clusterName, jobId, 
operationType, status, failureReason);
+            return null;
+        });
+    }
+
+    @Override
+    @NotNull
+    public List<OperationalJobRecord> findAllJobs(int limit)
+    {
+        return execute("findAllJobs", () -> 
clusterOpsAccessor.findAllJobs(clusterName, limit));
+    }
+
+    @Override
+    public boolean trySetActiveOperation(OperationType operationType, UUID 
operationId)
+    {
+        return execute("trySetActiveOperation",
+                       () -> 
activeOpsAccessor.trySetActiveOperation(clusterName, datacenter,
+                                                                     
operationType, operationId));
+    }
+
+    @Override
+    @Nullable
+    public UUID getActiveOperation(OperationType operationType)
+    {
+        return execute("getActiveOperation",
+                       () -> activeOpsAccessor.getActiveOperation(clusterName, 
datacenter, operationType));
+    }
+
+    @Override
+    @NotNull
+    public Map<OperationType, UUID> getActiveOperations()
+    {
+        return execute("getActiveOperations",
+                       () -> 
activeOpsAccessor.getActiveOperations(clusterName, datacenter));
+    }
+
+    @Override
+    public boolean clearActiveOperation(OperationType operationType, UUID 
operationId)
+    {
+        return execute("clearActiveOperation",
+                       () -> 
activeOpsAccessor.clearActiveOperation(clusterName, datacenter,
+                                                                    
operationType, operationId));
+    }
+
+    @Override
+    public void updateNodeStatuses(UUID operationId, List<UUID> nodeIds, 
OperationalJobStatus nodeStatus)
+    {
+        execute("updateNodeStatuses", () -> {
+            nodeStateAccessor.updateNodeStatuses(clusterName, operationId, 
nodeIds, nodeStatus);
+            return null;
+        });
+    }
+
+    @Override
+    public void updateNodeStatus(UUID operationId, UUID nodeId, 
OperationalJobStatus nodeStatus)
+    {
+        execute("updateNodeStatus", () -> {
+            nodeStateAccessor.updateNodeStatus(clusterName, operationId, 
nodeId, nodeStatus);
+            return null;
+        });
+    }
+
+    @Override
+    @Nullable
+    public OperationalJobStatus getNodeStatus(UUID operationId, UUID nodeId)
+    {
+        return execute("getNodeStatus",
+                       () -> nodeStateAccessor.getNodeStatus(clusterName, 
operationId, nodeId));
+    }
+
+    @Override
+    @NotNull
+    public Map<UUID, OperationalJobStatus> getNodeStatusesForOperation(UUID 
operationId)
+    {
+        return execute("getNodeStatusesForOperation",
+                       () -> 
nodeStateAccessor.getNodeStatusesForOperation(clusterName, operationId));
+    }
+
+    @Override
+    public void initialize()
+    {
+        // Schema initialization is handled by SidecarSchemaInitializer.
+        // TTL on all tables handles record pruning.
+        try
+        {
+            Session session = sessionProvider.get();
+            if (clusterName == null)
+            {
+                clusterName = 
session.getCluster().getMetadata().getClusterName();
+            }
+            if (datacenter == null)
+            {
+                Collection<Host> connectedHosts = 
session.getState().getConnectedHosts();
+                if (connectedHosts.isEmpty())
+                {
+                    throw new StorageProviderException(
+                    "Failed to resolve local datacenter: no connected hosts 
available");
+                }
+                datacenter = connectedHosts.iterator().next().getDatacenter();
+            }
+        }
+        catch (CassandraUnavailableException e)
+        {
+            throw new StorageProviderException("Failed to initialize storage 
provider", e);
+        }
+    }
+
+    @Override
+    public boolean isAvailable()
+    {
+        return clusterOpsAccessor.isAvailable()
+               && nodeStateAccessor.isAvailable()
+               && activeOpsAccessor.isAvailable();
+    }
+
+    @Override
+    public void close()
+    {
+        // Session lifecycle is managed by the DI container.
+    }
+
+    private <T> T execute(String operation, Supplier<T> action)
+    {
+        if (clusterName == null || datacenter == null)
+        {
+            throw new StorageProviderException("StorageProvider has not been 
initialized. Call initialize() first.");
+        }
+        try
+        {
+            return action.get();
+        }
+        catch (DriverException e)
+        {
+            throw new StorageProviderException("Failed to execute " + 
operation, e);
+        }
+    }
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/job/storage/OperationalJobRecord.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/job/storage/OperationalJobRecord.java
new file mode 100644
index 00000000..7e3d508a
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/job/storage/OperationalJobRecord.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.job.storage;
+
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+
+import java.util.UUID;
+
+import com.datastax.driver.core.utils.UUIDs;
+import org.apache.cassandra.sidecar.common.data.OperationType;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.utils.Preconditions;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * A data transfer object representing the persisted state of an operational 
job.
+ */
+public class OperationalJobRecord
+{
+    private final UUID jobId;
+    private final OperationType operationType;
+    private final OperationalJobStatus status;
+    private final long creationTimeMillis;
+    @Nullable
+    private final Instant startTime;
+    @Nullable
+    private final Instant lastUpdate;
+    @Nullable
+    private final String failureReason;
+    @Nullable
+    private final List<List<UUID>> nodeExecutionOrder;
+    @Nullable
+    private final Map<String, String> operationMetadata;
+
+    /**
+     * Constructs an OperationalJobRecord with the given fields.
+     *
+     * @param jobId         time-based v1 UUID identifying the job
+     * @param operationType the operation type
+     * @param status        the current status of the job
+     */
+    public OperationalJobRecord(UUID jobId, OperationType operationType, 
OperationalJobStatus status)
+    {
+        this(jobId, operationType, status, null, Instant.now(), null, null, 
null);
+    }
+
+    /**
+     * Constructs an OperationalJobRecord with all fields.
+     *
+     * @param jobId             time-based v1 UUID identifying the job
+     * @param operationType     the operation type
+     * @param status            the current status of the job
+     * @param startTime         the timestamp when execution started, or null 
if not yet started
+     * @param lastUpdate        the timestamp of the last status update, or 
null for pre-existing rows
+     * @param failureReason     the failure reason if the job failed, or null
+     * @param nodeExecutionOrder        the ordered list of parallel node 
groups for execution, or null
+     * @param operationMetadata the operation parameters, or null
+     */
+    public OperationalJobRecord(UUID jobId, OperationType operationType, 
OperationalJobStatus status,
+                                @Nullable Instant startTime,
+                                @Nullable Instant lastUpdate,
+                                @Nullable String failureReason,
+                                @Nullable List<List<UUID>> nodeExecutionOrder,
+                                @Nullable Map<String, String> 
operationMetadata)
+    {
+        Preconditions.checkArgument(jobId != null, "jobId must not be null");
+        Preconditions.checkArgument(jobId.version() == 1, "jobId must be a 
time-based (v1) UUID");
+        Preconditions.checkArgument(operationType != null, "operationType must 
not be null");
+        Preconditions.checkArgument(status != null, "status must not be null");
+        this.jobId = jobId;
+        this.operationType = operationType;
+        this.status = status;
+        this.creationTimeMillis = UUIDs.unixTimestamp(jobId);
+        this.startTime = startTime;
+        this.lastUpdate = lastUpdate;
+        this.failureReason = failureReason;
+        this.nodeExecutionOrder = nodeExecutionOrder;
+        this.operationMetadata = operationMetadata;
+    }
+
+    /**
+     * @return the time-based v1 UUID identifying this job
+     */
+    public UUID jobId()
+    {
+        return jobId;
+    }
+
+    /**
+     * @return the operation type
+     */
+    public OperationType operationType()
+    {
+        return operationType;
+    }
+
+    /**
+     * @return the current status of the job
+     */
+    public OperationalJobStatus status()
+    {
+        return status;
+    }
+
+    /**
+     * @return the unix timestamp in milliseconds when the job was created, 
extracted from the time-based UUID
+     */
+    public long creationTimeMillis()
+    {
+        return creationTimeMillis;
+    }
+
+    /**
+     * @return the timestamp when execution started, or null if not yet started
+     */
+    @Nullable
+    public Instant startTime()
+    {
+        return startTime;
+    }
+
+    /**
+     * @return the timestamp of the last status update, or null for 
pre-existing rows
+     */
+    @Nullable
+    public Instant lastUpdate()
+    {
+        return lastUpdate;
+    }
+
+    /**
+     * @return the failure reason if the job failed, or null otherwise
+     */
+    @Nullable
+    public String failureReason()
+    {
+        return failureReason;
+    }
+
+    /**
+     * @return the ordered list of parallel node groups for execution, or null 
if not set
+     */
+    @Nullable
+    public List<List<UUID>> nodeExecutionOrder()
+    {
+        return nodeExecutionOrder;
+    }
+
+    /**
+     * @return the operation parameters, or null if not set
+     */
+    @Nullable
+    public Map<String, String> operationMetadata()
+    {
+        return operationMetadata;
+    }
+
+    @Override
+    public String toString()
+    {
+        return "OperationalJobRecord{" +
+               "jobId=" + jobId +
+               ", operationType=" + operationType +
+               ", status=" + status +
+               ", creationTimeMillis=" + creationTimeMillis +
+               ", startTime=" + startTime +
+               ", lastUpdate=" + lastUpdate +
+               ", failureReason=" + failureReason +
+               ", nodeExecutionOrder=" + nodeExecutionOrder +
+               ", operationMetadata=" + operationMetadata +
+               '}';
+    }
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/job/storage/StorageProvider.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/job/storage/StorageProvider.java
new file mode 100644
index 00000000..0f931bc5
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/job/storage/StorageProvider.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.job.storage;
+
+import java.io.Closeable;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.cassandra.sidecar.common.data.OperationType;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * A provider-agnostic storage abstraction for durable operational job state.
+ * Each {@code StorageProvider} instance is scoped to a single cluster and 
datacenter.
+ * The cluster and datacenter identity is configuration, not a per-call 
parameter.
+ * <p>
+ * This interface defines a data access pattern for persisting, modifying, and 
querying OperationalJobs.
+ * Higher-level coordination logic, such as clearing the active operation lock 
when a job reaches
+ * a terminal state, belongs in the layers that depend on this interface.
+ */
+public interface StorageProvider extends Closeable
+{
+    // --- Operational Job Storage ---
+
+    /**
+     * Persist a new operational job record.
+     * <p>
+     * Implementations must provide upsert semantics as a retry safety net: if 
called again with the
+     * same job ID (e.g., due to a network timeout where the caller does not 
know if the first write
+     * succeeded), the existing record should be overwritten rather than 
throwing.
+     * <p>
+     * Implementations should throw {@link StorageProviderException} on write 
failure.
+     *
+     * @param job the job record to store
+     */
+    void persistJob(OperationalJobRecord job);
+
+    /**
+     * Find a job by its ID.
+     *
+     * @param jobId the job identifier
+     * @return the job record, or {@code null} if not found
+     */
+    @Nullable
+    OperationalJobRecord findJob(UUID jobId);
+
+    /**
+     * Update the status of an existing job.
+     * <p>
+     * Implementations should throw {@link StorageProviderException} on write 
failure.
+     *
+     * @param jobId         the job identifier
+     * @param operationType the operation type
+     * @param status        the new status
+     * @param failureReason the failure reason if the job has failed, or 
{@code null} otherwise
+     */
+    void updateJobStatus(UUID jobId, OperationType operationType, 
OperationalJobStatus status,
+                         @Nullable String failureReason);
+
+    /**
+     * Retrieve stored job records, up to the specified limit. Implementations 
should return
+     * records in descending time order. 
+     *
+     * @param limit the maximum number of job records to return
+     * @return list of job records, never null
+     */
+    @NotNull
+    List<OperationalJobRecord> findAllJobs(int limit);
+
+    // --- Active Operation Coordination ---
+
+    /**
+     * Set an operation as active if no other operation of the same type is 
currently active.
+     * <p>
+     * Implementations must provide compare-and-set (CAS) semantics to ensure 
only one active
+     * operation of a given type runs at a time across the cluster. This lock 
is per operation
+     * plan, not per node; within a single operation, multiple nodes may be 
operated on
+     * concurrently as defined by the node execution order.
+     *
+     * @param operationType the operation type
+     * @param operationId   the unique identifier for this operation
+     * @return {@code true} if the operation was successfully set as active, 
{@code false} if
+     *         an operation of the same type is already active (including the 
same operation ID)
+     */
+    boolean trySetActiveOperation(OperationType operationType, UUID 
operationId);
+
+    /**
+     * Get the active operation ID for a given operation type.
+     *
+     * @param operationType the operation type
+     * @return the active operation ID, or {@code null} if no operation of 
this type is active
+     */
+    @Nullable
+    UUID getActiveOperation(OperationType operationType);
+
+    /**
+     * Get all active operations for the local datacenter.
+     *
+     * @return a map of operation type to operation ID for all currently 
active operations
+     *         in the local datacenter, never null
+     */
+    @NotNull
+    Map<OperationType, UUID> getActiveOperations();
+
+    /**
+     * Clear the active operation lock, but only if the provided operation ID 
matches
+     * the currently active one.
+     *
+     * @param operationType the operation type
+     * @param operationId   the operation ID to clear
+     * @return {@code true} if the active operation was cleared, {@code false} 
if the provided
+     *         operation ID did not match the active one
+     */
+    boolean clearActiveOperation(OperationType operationType, UUID 
operationId);
+
+    // --- Node Status Tracking ---
+
+    /**
+     * Update the status of multiple nodes for an operation in a single call.
+     * <p>
+     * Implementations should optimize with batch writes where possible. This 
method is not
+     * guaranteed to be atomic; if any write fails, implementations should 
throw
+     * {@link StorageProviderException}. Callers can safely retry the full 
list since 
+     * writing the same status to a node is idempotent.
+     *
+     * @param operationId the operation identifier
+     * @param nodeIds     the list of node identifiers to update
+     * @param nodeStatus  the status to set for all specified nodes
+     */
+    void updateNodeStatuses(UUID operationId, List<UUID> nodeIds, 
OperationalJobStatus nodeStatus);
+
+    /**
+     * Update a node's status within an operation.
+     * <p>
+     * Implementations should throw {@link StorageProviderException} on write 
failure.
+     *
+     * @param operationId the operation identifier
+     * @param nodeId      the node identifier
+     * @param nodeStatus  the new status for the node
+     */
+    void updateNodeStatus(UUID operationId, UUID nodeId, OperationalJobStatus 
nodeStatus);
+
+    /**
+     * Get a node's current status within an operation.
+     *
+     * @param operationId the operation identifier
+     * @param nodeId      the node identifier
+     * @return the node's current status, or {@code null} if not found
+     */
+    @Nullable
+    OperationalJobStatus getNodeStatus(UUID operationId, UUID nodeId);
+
+    /**
+     * Get all node statuses for an operation.
+     *
+     * @param operationId the operation identifier
+     * @return a map of node IDs to their current status, never null
+     */
+    @NotNull
+    Map<UUID, OperationalJobStatus> getNodeStatusesForOperation(UUID 
operationId);
+
+    // --- Lifecycle ---
+
+    /**
+     * Initialize the storage provider (e.g. schema creation, connection 
setup).
+     * <p>
+     * Implementations must use this method to prepare the StorageProvider for 
accepting users
+     * requests, and ensure that this is idempotent. No other methods should 
be used to do
+     * this activity.
+     * <p>
+     * Callers must invoke this method before calling any other method on this 
interface.
+     * <p>
+     * Implementations must ensure that stored records are eventually pruned. 
The default
+     * Cassandra implementation relies on table-level TTL. Other 
implementations should
+     * establish their own pruning strategy (e.g., scheduled deletes of old 
records).
+     */
+    void initialize();
+
+    /**
+     * Returns whether the provider is ready to accept operations.
+     *
+     * @return {@code true} if the provider is initialized and available, 
{@code false} otherwise
+     */
+    boolean isAvailable();
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/job/storage/StorageProviderException.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/job/storage/StorageProviderException.java
new file mode 100644
index 00000000..1d8d348f
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/job/storage/StorageProviderException.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.job.storage;
+
+/**
+ * A runtime exception that {@link StorageProvider} implementations should 
wrap their
+ * backend-specific errors in. This gives callers a single exception type to 
catch regardless
+ * of the underlying backend.
+ * <p>
+ * Implementations should catch backend-specific exceptions (e.g., {@code 
DriverException}
+ * for Cassandra, {@code SQLException} for JDBC) and wrap them in {@code 
StorageProviderException}
+ * with the original exception as the cause.
+ */
+public class StorageProviderException extends RuntimeException
+{
+    /**
+     * Constructs a new StorageProviderException with the specified detail 
message.
+     *
+     * @param message the detail message
+     */
+    public StorageProviderException(String message)
+    {
+        super(message);
+    }
+
+    /**
+     * Constructs a new StorageProviderException with the specified detail 
message and cause.
+     *
+     * @param message the detail message
+     * @param cause   the underlying cause
+     */
+    public StorageProviderException(String message, Throwable cause)
+    {
+        super(message, cause);
+    }
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/modules/OperationalJobsModule.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/modules/OperationalJobsModule.java
new file mode 100644
index 00000000..820e7c38
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/modules/OperationalJobsModule.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.modules;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import com.google.inject.multibindings.ProvidesIntoMap;
+import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.db.ActiveClusterOpsDatabaseAccessor;
+import org.apache.cassandra.sidecar.db.ClusterOpsDatabaseAccessor;
+import org.apache.cassandra.sidecar.db.ClusterOpsNodeStateDatabaseAccessor;
+import org.apache.cassandra.sidecar.db.schema.ActiveClusterOpsSchema;
+import org.apache.cassandra.sidecar.db.schema.ClusterOpsNodeStateSchema;
+import org.apache.cassandra.sidecar.db.schema.ClusterOpsSchema;
+import org.apache.cassandra.sidecar.db.schema.TableSchema;
+import org.apache.cassandra.sidecar.job.storage.CassandraStorageProvider;
+import org.apache.cassandra.sidecar.job.storage.StorageProvider;
+import org.apache.cassandra.sidecar.modules.multibindings.KeyClassMapKey;
+import org.apache.cassandra.sidecar.modules.multibindings.TableSchemaMapKeys;
+
+/**
+ * Guice module for operational job storage, providing schema registrations
+ * and the {@link CassandraStorageProvider}.
+ */
+public class OperationalJobsModule extends AbstractModule
+{
+    @ProvidesIntoMap
+    @KeyClassMapKey(TableSchemaMapKeys.ClusterOpsSchemaKey.class)
+    TableSchema clusterOpsSchema(SidecarConfiguration configuration)
+    {
+        return new 
ClusterOpsSchema(configuration.serviceConfiguration().schemaKeyspaceConfiguration(),
+                                    
configuration.operationalJobConfiguration().tablesTtl());
+    }
+
+    @ProvidesIntoMap
+    @KeyClassMapKey(TableSchemaMapKeys.ClusterOpsNodeStateSchemaKey.class)
+    TableSchema clusterOpsNodeStateSchema(SidecarConfiguration configuration)
+    {
+        return new 
ClusterOpsNodeStateSchema(configuration.serviceConfiguration().schemaKeyspaceConfiguration(),
+                                             
configuration.operationalJobConfiguration().tablesTtl());
+    }
+
+    @ProvidesIntoMap
+    @KeyClassMapKey(TableSchemaMapKeys.ActiveClusterOpsSchemaKey.class)
+    TableSchema activeClusterOpsSchema(SidecarConfiguration configuration)
+    {
+        return new 
ActiveClusterOpsSchema(configuration.serviceConfiguration().schemaKeyspaceConfiguration(),
+                                          
configuration.operationalJobConfiguration().tablesTtl());
+    }
+
+    @Provides
+    @Singleton
+    StorageProvider cassandraStorageProvider(CQLSessionProvider 
sessionProvider,
+                                            ClusterOpsDatabaseAccessor 
clusterOpsAccessor,
+                                            
ClusterOpsNodeStateDatabaseAccessor nodeStateAccessor,
+                                            ActiveClusterOpsDatabaseAccessor 
activeOpsAccessor)
+    {
+        return new CassandraStorageProvider(sessionProvider,
+                                            clusterOpsAccessor, 
nodeStateAccessor, activeOpsAccessor, null);
+    }
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/modules/SidecarModules.java 
b/server/src/main/java/org/apache/cassandra/sidecar/modules/SidecarModules.java
index 7361254d..daf59eb7 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/modules/SidecarModules.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/modules/SidecarModules.java
@@ -64,6 +64,7 @@ public class SidecarModules
                .add(new LifecycleModule())
                .add(new LiveMigrationModule())
                .add(new MultiBindingTypeResolverModule())
+               .add(new OperationalJobsModule())
                .add(new OpenApiModule())
                .add(new RestoreJobModule())
                .add(new SchedulingModule())
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/TableSchemaMapKeys.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/TableSchemaMapKeys.java
index 13a1ec83..57abe5d4 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/TableSchemaMapKeys.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/TableSchemaMapKeys.java
@@ -34,4 +34,7 @@ public interface TableSchemaMapKeys
     interface SystemViewsClientsSchemaKey extends ClassKey {}
     interface TableHistorySchemaKey extends ClassKey {}
     interface CdcStatesSchemaKey extends ClassKey {}
+    interface ClusterOpsSchemaKey extends ClassKey {}
+    interface ClusterOpsNodeStateSchemaKey extends ClassKey {}
+    interface ActiveClusterOpsSchemaKey extends ClassKey {}
 }
diff --git 
a/server/src/test/integration/org/apache/cassandra/sidecar/db/ActiveClusterOpsDatabaseAccessorIntegrationTest.java
 
b/server/src/test/integration/org/apache/cassandra/sidecar/db/ActiveClusterOpsDatabaseAccessorIntegrationTest.java
new file mode 100644
index 00000000..205dfe28
--- /dev/null
+++ 
b/server/src/test/integration/org/apache/cassandra/sidecar/db/ActiveClusterOpsDatabaseAccessorIntegrationTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.db;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.utils.UUIDs;
+import org.apache.cassandra.sidecar.common.data.OperationType;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration tests for {@link ActiveClusterOpsDatabaseAccessor}
+ */
+class ActiveClusterOpsDatabaseAccessorIntegrationTest extends 
IntegrationTestBase
+{
+    @CassandraIntegrationTest
+    void testLwtOperations()
+    {
+        waitForSchemaReady(10, TimeUnit.SECONDS);
+
+        ActiveClusterOpsDatabaseAccessor accessor = 
injector.getInstance(ActiveClusterOpsDatabaseAccessor.class);
+        Session session = maybeGetSession();
+        String clusterName = 
session.getCluster().getMetadata().getClusterName();
+        String datacenter = 
session.getState().getConnectedHosts().iterator().next().getDatacenter();
+
+        UUID operationId1 = UUIDs.timeBased();
+        UUID operationId2 = UUIDs.timeBased();
+
+        assertThat(accessor.getActiveOperation(clusterName, datacenter, 
OperationType.REPAIR))
+                .withFailMessage("getActiveOperation should return null when 
no active operation exists")
+                .isNull();
+        assertThat(accessor.getActiveOperations(clusterName, datacenter))
+                .withFailMessage("getActiveOperations should return an empty 
map when no active operation exists")
+                .isEmpty();
+
+        assertThat(accessor)
+                .withFailMessage("trySetActiveOperation should succeed when no 
active operation, and getActiveOperation should return the operation")
+                .satisfies(a -> {
+                    assertThat(a.trySetActiveOperation(clusterName, 
datacenter, OperationType.REPAIR, operationId1)).isTrue();
+                    assertThat(a.getActiveOperation(clusterName, datacenter, 
OperationType.REPAIR)).isEqualTo(operationId1);
+                });
+
+        assertThat(accessor)
+                .withFailMessage("trySetActiveOperation should fail when an 
operation of the same type is active")
+                .satisfies(a -> {
+                    assertThat(a.trySetActiveOperation(clusterName, 
datacenter, OperationType.REPAIR, operationId2)).isFalse();
+                    assertThat(a.getActiveOperation(clusterName, datacenter, 
OperationType.REPAIR)).isEqualTo(operationId1);
+                });
+
+        assertThat(accessor.trySetActiveOperation(clusterName, datacenter, 
OperationType.REPAIR, operationId1))
+                .withFailMessage("trySetActiveOperation should fail when 
retried with same active operation ID")
+                .isFalse();
+
+        UUID decommissionId = UUIDs.timeBased();
+        assertThat(accessor.trySetActiveOperation(clusterName, datacenter, 
OperationType.DECOMMISSION, decommissionId))
+                .withFailMessage("trySetActiveOperation should succeed for a 
different operation type")
+                .isTrue();
+        assertThat(accessor.getActiveOperations(clusterName, datacenter))
+                .withFailMessage("getActiveOperations should return all 
concurrently active operations")
+                .hasSize(2)
+                .containsEntry(OperationType.REPAIR, operationId1)
+                .containsEntry(OperationType.DECOMMISSION, decommissionId);
+
+        assertThat(accessor)
+                .withFailMessage("clearActiveOperation should fail when a 
non-matching operation ID is supplied")
+                .satisfies(a -> {
+                    assertThat(a.clearActiveOperation(clusterName, datacenter, 
OperationType.REPAIR, operationId2)).isFalse();
+                    assertThat(a.getActiveOperation(clusterName, datacenter, 
OperationType.REPAIR)).isEqualTo(operationId1);
+                });
+
+        assertThat(accessor)
+                .withFailMessage("clearActiveOperation should succeed when the 
operation ID matches the active operation")
+                .satisfies(a -> {
+                    assertThat(a.clearActiveOperation(clusterName, datacenter, 
OperationType.REPAIR, operationId1)).isTrue();
+                    assertThat(a.getActiveOperation(clusterName, datacenter, 
OperationType.REPAIR)).isNull();
+                });
+
+        assertThat(accessor.clearActiveOperation(clusterName, datacenter, 
OperationType.REPAIR, operationId1))
+                .withFailMessage("clearActiveOperation should be a safe no-op 
when retried after operation is already cleared")
+                .isFalse();
+
+        assertThat(accessor)
+                .withFailMessage("trySetActiveOperation should succeed after 
active operation is cleared, and other operation types should be unaffected")
+                .satisfies(a -> {
+                    assertThat(a.trySetActiveOperation(clusterName, 
datacenter, OperationType.REPAIR, operationId2)).isTrue();
+                    assertThat(a.getActiveOperation(clusterName, datacenter, 
OperationType.REPAIR)).isEqualTo(operationId2);
+                    assertThat(a.getActiveOperation(clusterName, datacenter, 
OperationType.DECOMMISSION)).isEqualTo(decommissionId);
+                });
+    }
+}
diff --git 
a/server/src/test/integration/org/apache/cassandra/sidecar/db/ClusterOpsDatabaseAccessorIntegrationTest.java
 
b/server/src/test/integration/org/apache/cassandra/sidecar/db/ClusterOpsDatabaseAccessorIntegrationTest.java
new file mode 100644
index 00000000..b0518fb9
--- /dev/null
+++ 
b/server/src/test/integration/org/apache/cassandra/sidecar/db/ClusterOpsDatabaseAccessorIntegrationTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.db;
+
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import com.datastax.driver.core.utils.UUIDs;
+import org.apache.cassandra.sidecar.common.data.OperationType;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.job.storage.OperationalJobRecord;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration tests for {@link ClusterOpsDatabaseAccessor}
+ */
+class ClusterOpsDatabaseAccessorIntegrationTest extends IntegrationTestBase
+{
+    @CassandraIntegrationTest
+    void testCrudOperations()
+    {
+        waitForSchemaReady(10, TimeUnit.SECONDS);
+
+        ClusterOpsDatabaseAccessor accessor = 
injector.getInstance(ClusterOpsDatabaseAccessor.class);
+        String clusterName = 
maybeGetSession().getCluster().getMetadata().getClusterName();
+
+        assertThat(accessor.findJob(clusterName, UUIDs.timeBased()))
+                .withFailMessage("findJob should return null for a job ID that 
was never persisted")
+                .isNull();
+
+        assertThat(accessor.findAllJobs(clusterName, 10))
+                .withFailMessage("findAllJobs should return an empty list when 
no jobs exist")
+                .isEmpty();
+
+        UUID jobId1 = UUIDs.timeBased();
+        List<List<UUID>> nodeOrder = Arrays.asList(
+                Arrays.asList(UUID.randomUUID(), UUID.randomUUID()),
+                Arrays.asList(UUID.randomUUID())
+        );
+        Map<String, String> metadata = Map.of("key1", "value1", "key2", 
"value2");
+        OperationalJobRecord job1 = new OperationalJobRecord(jobId1, 
OperationType.REPAIR, OperationalJobStatus.CREATED,
+                                                             null, 
Instant.now(), null, nodeOrder, metadata);
+        accessor.persistJob(clusterName, job1);
+
+        OperationalJobRecord found = accessor.findJob(clusterName, jobId1);
+        assertThat(found)
+                .withFailMessage("findJob should return the persisted job with 
all fields intact")
+                .isNotNull()
+                .satisfies(job -> {
+                    assertThat(job.jobId()).isEqualTo(jobId1);
+                    
assertThat(job.operationType()).isEqualTo(OperationType.REPAIR);
+                    
assertThat(job.status()).isEqualTo(OperationalJobStatus.CREATED);
+                    assertThat(job.startTime()).isNull();
+                    assertThat(job.lastUpdate()).isNotNull();
+                    assertThat(job.failureReason()).isNull();
+                    assertThat(job.nodeExecutionOrder()).isEqualTo(nodeOrder);
+                    assertThat(job.operationMetadata()).isEqualTo(metadata);
+                });
+
+        UUID jobId2 = UUIDs.timeBased();
+        OperationalJobRecord job2 = new OperationalJobRecord(jobId2, 
OperationType.DECOMMISSION, OperationalJobStatus.CREATED);
+        accessor.persistJob(clusterName, job2);
+
+        OperationalJobRecord found2 = accessor.findJob(clusterName, jobId2);
+        assertThat(found2)
+                .withFailMessage("findJob should return the persisted job with 
null nullable fields preserved")
+                .isNotNull()
+                .satisfies(job -> {
+                    assertThat(job.jobId()).isEqualTo(jobId2);
+                    
assertThat(job.operationType()).isEqualTo(OperationType.DECOMMISSION);
+                    assertThat(job.nodeExecutionOrder()).isNull();
+                    assertThat(job.operationMetadata()).isNull();
+                });
+
+        Instant beforeRunning = 
Instant.now().truncatedTo(java.time.temporal.ChronoUnit.MILLIS);
+        accessor.updateJobStatus(clusterName, jobId1, OperationType.REPAIR, 
OperationalJobStatus.RUNNING, null);
+        OperationalJobRecord updated = accessor.findJob(clusterName, jobId1);
+        assertThat(updated)
+                .withFailMessage("findJob should reflect the updated status 
while preserving other fields")
+                .isNotNull()
+                .satisfies(job -> {
+                    
assertThat(job.status()).isEqualTo(OperationalJobStatus.RUNNING);
+                    assertThat(job.startTime()).isNotNull();
+                    
assertThat(job.startTime()).isAfterOrEqualTo(beforeRunning);
+                    
assertThat(job.lastUpdate()).isAfterOrEqualTo(beforeRunning);
+                    assertThat(job.nodeExecutionOrder()).isEqualTo(nodeOrder);
+                });
+
+        Instant firstStartTime = updated.startTime();
+        accessor.updateJobStatus(clusterName, jobId1, OperationType.REPAIR, 
OperationalJobStatus.RUNNING, null);
+        OperationalJobRecord afterSecondRunning = 
accessor.findJob(clusterName, jobId1);
+        assertThat(afterSecondRunning.startTime())
+                .withFailMessage("start_time should not change on subsequent 
RUNNING updates")
+                .isEqualTo(firstStartTime);
+        assertThat(afterSecondRunning.lastUpdate())
+                .withFailMessage("last_update should advance on each status 
update")
+                .isAfterOrEqualTo(updated.lastUpdate());
+
+        accessor.updateJobStatus(clusterName, jobId1, OperationType.REPAIR, 
OperationalJobStatus.FAILED, "node unreachable");
+        OperationalJobRecord failed = accessor.findJob(clusterName, jobId1);
+        assertThat(failed)
+                .isNotNull()
+                .satisfies(job -> {
+                    
assertThat(job.status()).isEqualTo(OperationalJobStatus.FAILED);
+                    assertThat(job.failureReason()).isEqualTo("node 
unreachable");
+                    assertThat(job.startTime()).isEqualTo(firstStartTime);
+                });
+
+        UUID jobId3 = UUIDs.timeBased();
+        accessor.persistJob(clusterName, new OperationalJobRecord(jobId3, 
OperationType.REPAIR, OperationalJobStatus.CREATED));
+        List<OperationalJobRecord> allJobs = accessor.findAllJobs(clusterName, 
10);
+        assertThat(allJobs)
+                .withFailMessage("findAllJobs should return every persisted 
job when the limit exceeds total count")
+                .hasSize(3);
+
+        List<OperationalJobRecord> limited = accessor.findAllJobs(clusterName, 
2);
+        assertThat(limited)
+                .withFailMessage("findAllJobs should truncate results to the 
specified limit")
+                .hasSize(2);
+
+        assertThat(allJobs.get(0).jobId())
+                .withFailMessage("findAllJobs should return the most recently 
created job first")
+                .isEqualTo(jobId3);
+
+        OperationalJobRecord job1Updated = new OperationalJobRecord(jobId1, 
OperationType.REPAIR,
+                                                                     
OperationalJobStatus.SUCCEEDED,
+                                                                     null, 
Instant.now(), null, nodeOrder, metadata);
+        accessor.persistJob(clusterName, job1Updated);
+        OperationalJobRecord afterUpsert = accessor.findJob(clusterName, 
jobId1);
+        assertThat(afterUpsert)
+                .withFailMessage("findJob should reflect the overwritten 
status after upsert with the same job ID")
+                .isNotNull()
+                .satisfies(job -> {
+                    
assertThat(job.status()).isEqualTo(OperationalJobStatus.SUCCEEDED);
+                });
+    }
+}
diff --git 
a/server/src/test/integration/org/apache/cassandra/sidecar/db/ClusterOpsNodeStateDatabaseAccessorIntegrationTest.java
 
b/server/src/test/integration/org/apache/cassandra/sidecar/db/ClusterOpsNodeStateDatabaseAccessorIntegrationTest.java
new file mode 100644
index 00000000..133b6221
--- /dev/null
+++ 
b/server/src/test/integration/org/apache/cassandra/sidecar/db/ClusterOpsNodeStateDatabaseAccessorIntegrationTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.db;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import com.datastax.driver.core.utils.UUIDs;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration tests for {@link ClusterOpsNodeStateDatabaseAccessor}
+ */
+class ClusterOpsNodeStateDatabaseAccessorIntegrationTest extends 
IntegrationTestBase
+{
+    @CassandraIntegrationTest
+    void testCrudOperations()
+    {
+        waitForSchemaReady(10, TimeUnit.SECONDS);
+
+        ClusterOpsNodeStateDatabaseAccessor accessor = 
injector.getInstance(ClusterOpsNodeStateDatabaseAccessor.class);
+        String clusterName = 
maybeGetSession().getCluster().getMetadata().getClusterName();
+
+        UUID operationId = UUIDs.timeBased();
+        UUID nodeId1 = UUID.randomUUID();
+        UUID nodeId2 = UUID.randomUUID();
+        UUID nodeId3 = UUID.randomUUID();
+
+        assertThat(accessor.getNodeStatus(clusterName, operationId, nodeId1))
+                .withFailMessage("getNodeStatus should return null for a 
non-existent node")
+                .isNull();
+
+        assertThat(accessor.getNodeStatusesForOperation(clusterName, 
operationId))
+                .withFailMessage("getNodeStatusesForOperation should return an 
empty map for a non-existent operation")
+                .isEmpty();
+
+        accessor.updateNodeStatus(clusterName, operationId, nodeId1, 
OperationalJobStatus.CREATED);
+        assertThat(accessor.getNodeStatus(clusterName, operationId, nodeId1))
+                .withFailMessage("getNodeStatus should return correct node 
status after updateNodeStatus")
+                .isEqualTo(OperationalJobStatus.CREATED);
+
+        accessor.updateNodeStatus(clusterName, operationId, nodeId1, 
OperationalJobStatus.RUNNING);
+        assertThat(accessor.getNodeStatus(clusterName, operationId, nodeId1))
+                .withFailMessage("getNodeStatus should return updated status 
after updateNodeStatus overwrites previous status")
+                .isEqualTo(OperationalJobStatus.RUNNING);
+
+        accessor.updateNodeStatus(clusterName, operationId, nodeId2, 
OperationalJobStatus.CREATED);
+        accessor.updateNodeStatus(clusterName, operationId, nodeId3, 
OperationalJobStatus.CREATED);
+        Map<UUID, OperationalJobStatus> allStatuses =
+                accessor.getNodeStatusesForOperation(clusterName, operationId);
+        assertThat(allStatuses)
+                .withFailMessage("getNodeStatusesForOperation should return 
all nodes with their latest statuses")
+                .hasSize(3)
+                .containsEntry(nodeId1, OperationalJobStatus.RUNNING)
+                .containsEntry(nodeId2, OperationalJobStatus.CREATED)
+                .containsEntry(nodeId3, OperationalJobStatus.CREATED);
+
+        accessor.updateNodeStatuses(clusterName, operationId,
+                                    Arrays.asList(nodeId2, nodeId3),
+                                    OperationalJobStatus.CREATED);
+        Map<UUID, OperationalJobStatus> afterRetry =
+                accessor.getNodeStatusesForOperation(clusterName, operationId);
+        assertThat(afterRetry)
+                .withFailMessage("updateNodeStatuses should be idempotent")
+                .hasSize(3)
+                .containsEntry(nodeId2, OperationalJobStatus.CREATED)
+                .containsEntry(nodeId3, OperationalJobStatus.CREATED);
+
+        UUID otherOperationId = UUIDs.timeBased();
+        accessor.updateNodeStatus(clusterName, otherOperationId, nodeId1, 
OperationalJobStatus.SUCCEEDED);
+        assertThat(accessor.getNodeStatusesForOperation(clusterName, 
otherOperationId))
+                .withFailMessage("getNodeStatusesForOperation should only 
return nodes for the queried operation")
+                .hasSize(1);
+        assertThat(accessor.getNodeStatus(clusterName, operationId, nodeId1))
+                .withFailMessage("getNodeStatus should be unaffected by node 
updates in a different operation")
+                .isEqualTo(OperationalJobStatus.RUNNING);
+
+        UUID chunkOperationId = UUIDs.timeBased();
+        List<UUID> nodeIds = new ArrayList<>();
+        for (int i = 0; i < 250; i++)
+        {
+            nodeIds.add(UUID.randomUUID());
+        }
+        accessor.updateNodeStatuses(clusterName, chunkOperationId, nodeIds, 
OperationalJobStatus.CREATED);
+        Map<UUID, OperationalJobStatus> chunkStatuses =
+                accessor.getNodeStatusesForOperation(clusterName, 
chunkOperationId);
+        assertThat(chunkStatuses)
+                .withFailMessage("getNodeStatusesForOperation should return 
all nodes persisted across chunked UNLOGGED batches")
+                .hasSize(250);
+        for (UUID nodeId : nodeIds)
+        {
+            assertThat(chunkStatuses)
+                    .withFailMessage("getNodeStatusesForOperation should show 
correct status for node %s after chunked batch write", nodeId)
+                    .containsEntry(nodeId, OperationalJobStatus.CREATED);
+        }
+    }
+}
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/config/yaml/OperationalJobConfigurationImplTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/config/yaml/OperationalJobConfigurationImplTest.java
new file mode 100644
index 00000000..09cd0e89
--- /dev/null
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/config/yaml/OperationalJobConfigurationImplTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.config.yaml;
+
+import org.junit.jupiter.api.Test;
+
+import 
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Tests for {@link OperationalJobConfigurationImpl}
+ */
+class OperationalJobConfigurationImplTest
+{
+    @Test
+    void testDefaultTtl()
+    {
+        OperationalJobConfigurationImpl config = new 
OperationalJobConfigurationImpl();
+        
assertThat(config.tablesTtl()).isEqualTo(SecondBoundConfiguration.parse("90d"));
+    }
+
+    @Test
+    void testTtlBelowMinimumThrows()
+    {
+        assertThatThrownBy(() -> OperationalJobConfigurationImpl.builder()
+                                                                
.tablesTtl(SecondBoundConfiguration.parse("13d"))
+                                                                .build())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("tablesTtl cannot be less than");
+    }
+}
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/db/SidecarSchemaTest.java 
b/server/src/test/java/org/apache/cassandra/sidecar/db/SidecarSchemaTest.java
index 2272df55..1cece57b 100644
--- 
a/server/src/test/java/org/apache/cassandra/sidecar/db/SidecarSchemaTest.java
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/db/SidecarSchemaTest.java
@@ -54,6 +54,9 @@ import org.apache.cassandra.sidecar.cluster.InstancesMetadata;
 import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
 import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
 import org.apache.cassandra.sidecar.coordination.ClusterLease;
+import org.apache.cassandra.sidecar.db.schema.ActiveClusterOpsSchema;
+import org.apache.cassandra.sidecar.db.schema.ClusterOpsNodeStateSchema;
+import org.apache.cassandra.sidecar.db.schema.ClusterOpsSchema;
 import org.apache.cassandra.sidecar.db.schema.RestoreJobsSchema;
 import org.apache.cassandra.sidecar.db.schema.RestoreRangesSchema;
 import org.apache.cassandra.sidecar.db.schema.RestoreSlicesSchema;
@@ -133,7 +136,7 @@ public class SidecarSchemaTest
     {
         sidecarSchemaInitializer.execute(Promise.promise());
         loopAssert(10, 500, () -> {
-            assertThat(interceptedExecStmts.size()).isEqualTo(9);
+            assertThat(interceptedExecStmts.size()).isEqualTo(12);
             assertThat(interceptedExecStmts.get(0)).as("Create keyspace should 
be executed the first")
                                                    .contains("CREATE KEYSPACE 
IF NOT EXISTS sidecar_internal");
             assertThat(interceptedExecStmts).as("Create table should be 
executed for job table")
@@ -152,6 +155,12 @@ public class SidecarSchemaTest
                                             .anyMatch(stmt -> 
stmt.contains("CREATE TABLE IF NOT EXISTS sidecar_internal.configs"));
             assertThat(interceptedExecStmts).as("Create table should be 
executed for sidecar_lease_v1 table")
                                             .anyMatch(stmt -> 
stmt.contains("CREATE TABLE IF NOT EXISTS sidecar_internal.sidecar_lease_v1"));
+            assertThat(interceptedExecStmts).as("Create table should be 
executed for cluster_ops table")
+                                            .anyMatch(stmt -> 
stmt.contains("CREATE TABLE IF NOT EXISTS sidecar_internal.cluster_ops"));
+            assertThat(interceptedExecStmts).as("Create table should be 
executed for active_cluster_ops table")
+                                            .anyMatch(stmt -> 
stmt.contains("CREATE TABLE IF NOT EXISTS 
sidecar_internal.active_cluster_ops"));
+            assertThat(interceptedExecStmts).as("Create table should be 
executed for cluster_ops_node_state table")
+                                            .anyMatch(stmt -> 
stmt.contains("CREATE TABLE IF NOT EXISTS 
sidecar_internal.cluster_ops_node_state"));
 
             List<String> expectedPrepStatements = Arrays.asList(
             "INSERT INTO sidecar_internal.restore_job_v6 (  created_at,  
job_id,  keyspace_name,  table_name,  " +
@@ -225,7 +234,23 @@ public class SidecarSchemaTest
             "SELECT table_schema FROM sidecar_internal.table_schema_history 
WHERE keyspace_name = ? AND table_name = ? AND version = ?",
 
             "INSERT INTO sidecar_internal.cdc_state_v2 (job_id, split, start, 
end, state) VALUES (?, ?, ?, ?, ?) USING TIMESTAMP ?",
-            "SELECT start, end, state FROM sidecar_internal.cdc_state_v2 WHERE 
job_id = ? AND split = ?"
+            "SELECT start, end, state FROM sidecar_internal.cdc_state_v2 WHERE 
job_id = ? AND split = ?",
+
+            "INSERT INTO sidecar_internal.cluster_ops (  cluster_name,  
operation_id,  operation_type,  status,  start_time,  last_update,  
failure_reason,  node_execution_order,  operation_metadata) VALUES (?, ?, ?, ?, 
?, ?, ?, ?, ?)",
+            "SELECT cluster_name, operation_id, operation_type, status, 
start_time, last_update, failure_reason, node_execution_order, 
operation_metadata FROM sidecar_internal.cluster_ops WHERE cluster_name = ? AND 
operation_id = ?",
+            "UPDATE sidecar_internal.cluster_ops SET status = ?, last_update = 
? WHERE cluster_name = ? AND operation_id = ? AND operation_type = ?",
+            "UPDATE sidecar_internal.cluster_ops SET status = ?, last_update = 
?, start_time = ? WHERE cluster_name = ? AND operation_id = ? AND 
operation_type = ?",
+            "UPDATE sidecar_internal.cluster_ops SET status = ?, last_update = 
?, failure_reason = ? WHERE cluster_name = ? AND operation_id = ? AND 
operation_type = ?",
+            "SELECT cluster_name, operation_id, operation_type, status, 
start_time, last_update, failure_reason, node_execution_order, 
operation_metadata FROM sidecar_internal.cluster_ops WHERE cluster_name = ? 
LIMIT ?",
+
+            "INSERT INTO sidecar_internal.active_cluster_ops (cluster_name, 
datacenter, operation_type, operation_id) VALUES (?, ?, ?, ?) IF NOT EXISTS",
+            "SELECT operation_type, operation_id FROM 
sidecar_internal.active_cluster_ops WHERE cluster_name = ? AND datacenter = ?",
+            "SELECT operation_id FROM sidecar_internal.active_cluster_ops 
WHERE cluster_name = ? AND datacenter = ? AND operation_type = ?",
+            "DELETE FROM sidecar_internal.active_cluster_ops WHERE 
cluster_name = ? AND datacenter = ? AND operation_type = ? IF operation_id = ?",
+
+            "INSERT INTO sidecar_internal.cluster_ops_node_state (  
cluster_name,  operation_id,  node_id,  node_status) VALUES (?, ?, ?, ?)",
+            "SELECT node_status FROM sidecar_internal.cluster_ops_node_state 
WHERE cluster_name = ? AND operation_id = ? AND node_id = ?",
+            "SELECT node_id, node_status FROM 
sidecar_internal.cluster_ops_node_state WHERE cluster_name = ? AND operation_id 
= ?"
             );
 
             assertThat(interceptedPrepStmts).as("Intercepted statements match 
expected statements")
@@ -238,6 +263,9 @@ public class SidecarSchemaTest
             
assertTableSchema(sidecarSchema.tableSchema(SidecarLeaseSchema.class), 
"sidecar_internal.sidecar_lease_v1");
             
assertTableSchema(sidecarSchema.tableSchema(SidecarRolePermissionsSchema.class),
 "sidecar_internal.role_permissions_v1");
             
assertTableSchema(sidecarSchema.tableSchema(SystemAuthSchema.class), 
"system_auth");
+            
assertTableSchema(sidecarSchema.tableSchema(ClusterOpsSchema.class), 
"sidecar_internal.cluster_ops");
+            
assertTableSchema(sidecarSchema.tableSchema(ActiveClusterOpsSchema.class), 
"sidecar_internal.active_cluster_ops");
+            
assertTableSchema(sidecarSchema.tableSchema(ClusterOpsNodeStateSchema.class), 
"sidecar_internal.cluster_ops_node_state");
         });
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to