This is an automated email from the ASF dual-hosted git repository.

frankgh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 402367a5a CASSSIDECAR-372: Define common operational job tracking 
interface and refactor current operational job tracker (#325)
402367a5a is described below

commit 402367a5aedbdb902d1aeb4d56b493a7176893c4
Author: Andrés Beck-Ruiz <[email protected]>
AuthorDate: Fri Mar 13 20:35:06 2026 -0400

    CASSSIDECAR-372: Define common operational job tracking interface and 
refactor current operational job tracker (#325)
    
    Patch by Andrés Beck-Ruiz; reviewed by Francisco Guerrero, Shailaja Koppu, 
Arjun Ashok, Yifan Cai for CASSSIDECAR-372
---
 CHANGES.txt                                        |   1 +
 ...ker.java => InMemoryOperationalJobTracker.java} |  19 +--
 .../sidecar/job/OperationalJobTracker.java         | 129 +++++----------------
 .../sidecar/modules/CassandraOperationsModule.java |   8 ++
 ...java => InMemoryOperationalJobTrackerTest.java} |  99 +++++++++++++++-
 .../sidecar/job/OperationalJobManagerTest.java     |   8 +-
 .../cassandra/sidecar/job/OperationalJobTest.java  |  30 +++++
 .../cassandra/sidecar/job/RepairJobTest.java       |   2 +-
 8 files changed, 181 insertions(+), 115 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index fc9094135..8df6ee722 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,7 @@
 
 0.3.0
 -----
+ * Define common operational job tracking interface and refactor current 
operational job tracker (CASSSIDECAR-372)
  * RangeManager should be singleton in CDCModule (CASSSIDECAR-411)
  * CDC: Add end-to-end CDC integration tests (CASSSIDECAR-308)
  * SchemaStorePublisherFactory should be Injectable in CachingSchemaStore 
(CASSSIDECAR-408)
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobTracker.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/job/InMemoryOperationalJobTracker.java
similarity index 88%
copy from 
server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobTracker.java
copy to 
server/src/main/java/org/apache/cassandra/sidecar/job/InMemoryOperationalJobTracker.java
index 6d87510b3..f37bc7690 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobTracker.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/job/InMemoryOperationalJobTracker.java
@@ -38,23 +38,24 @@ import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.VisibleForTesting;
 
 /**
- * Tracks and stores the results of long-running jobs running on the sidecar
+ * Local in-memory implementation of {@link OperationalJobTracker}.
+ * Stores jobs in a synchronized LinkedHashMap with TTL-based eviction.
  */
 @Singleton
-public class OperationalJobTracker
+public class InMemoryOperationalJobTracker implements OperationalJobTracker
 {
     public static final long ONE_DAY_TTL = TimeUnit.DAYS.toMillis(1); // todo: 
consider making it configurable
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(OperationalJobTracker.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(InMemoryOperationalJobTracker.class);
     private final Map<UUID, OperationalJob> map;
 
     @Inject
-    public OperationalJobTracker(ServiceConfiguration serviceConfiguration)
+    public InMemoryOperationalJobTracker(ServiceConfiguration 
serviceConfiguration)
     {
         this(serviceConfiguration.operationalJobTrackerSize());
     }
 
-    public OperationalJobTracker(int initialCapacity)
+    public InMemoryOperationalJobTracker(int initialCapacity)
     {
         map = Collections.synchronizedMap(new LinkedHashMap<UUID, 
OperationalJob>(initialCapacity)
         {
@@ -88,11 +89,13 @@ public class OperationalJobTracker
         });
     }
 
+    @Override
     public OperationalJob computeIfAbsent(UUID key, Function<UUID, 
OperationalJob> mappingFunction)
     {
         return map.computeIfAbsent(key, mappingFunction);
     }
 
+    @Override
     public OperationalJob get(UUID key)
     {
         return map.get(key);
@@ -103,8 +106,9 @@ public class OperationalJobTracker
      *
      * @return an immutable copy of the underlying mapping
      */
+    @Override
     @NotNull
-    Map<UUID, OperationalJob> jobsView()
+    public Map<UUID, OperationalJob> jobsView()
     {
         return Collections.unmodifiableMap(map);
     }
@@ -113,8 +117,9 @@ public class OperationalJobTracker
      * Filters the inflight (created or running) jobs matching the job name 
from the jobsView
      * @return list of inflight jobs being tracked
      */
+    @Override
     @NotNull
-    List<OperationalJob> inflightJobsByOperation(String operation)
+    public List<OperationalJob> inflightJobsByOperation(String operation)
     {
         return jobsView().values()
                          .stream()
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobTracker.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobTracker.java
index 6d87510b3..4d5369542 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobTracker.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobTracker.java
@@ -18,121 +18,56 @@
 
 package org.apache.cassandra.sidecar.job;
 
-import java.util.Collections;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
-import java.util.stream.Collectors;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
-import org.apache.cassandra.sidecar.config.ServiceConfiguration;
 import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.VisibleForTesting;
 
 /**
- * Tracks and stores the results of long-running jobs running on the sidecar
+ * Tracks and stores the results of long-running jobs running on the sidecar.
+ * Implementations can use different storage backends (in-memory, persistent, 
etc.)
  */
-@Singleton
-public class OperationalJobTracker
+public interface OperationalJobTracker
 {
-    public static final long ONE_DAY_TTL = TimeUnit.DAYS.toMillis(1); // todo: 
consider making it configurable
-
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(OperationalJobTracker.class);
-    private final Map<UUID, OperationalJob> map;
-
-    @Inject
-    public OperationalJobTracker(ServiceConfiguration serviceConfiguration)
-    {
-        this(serviceConfiguration.operationalJobTrackerSize());
-    }
-
-    public OperationalJobTracker(int initialCapacity)
-    {
-        map = Collections.synchronizedMap(new LinkedHashMap<UUID, 
OperationalJob>(initialCapacity)
-        {
-            /**
-             * {@inheritDoc}
-             */
-            @Override
-            protected boolean removeEldestEntry(Map.Entry<UUID, 
OperationalJob> eldest)
-            {
-                // We have reached capacity and the oldest entry is either 
ready for cleanup or stale
-                if (map.size() > initialCapacity)
-                {
-                    OperationalJob job = eldest.getValue();
-                    OperationalJobStatus status = job.status();
-                    if (status.isCompleted() && 
job.isStale(System.currentTimeMillis(), ONE_DAY_TTL))
-                    {
-                        LOGGER.debug("Expiring completed and stale job due to 
job tracker has reached max size. jobId={} status={} createdAt={}",
-                                     job.jobId(), status, job.creationTime());
-                        return true;
-                    }
-                    else
-                    {
-                        LOGGER.warn("Job tracker reached max size, but the 
eldest job is not completed yet. " +
-                                    "Not evicting. jobId={} status={}", 
job.jobId(), status);
-                        // TODO: Optionally trigger cleanup to fetch next 
oldest to evict
-                    }
-                }
-
-                return false;
-            }
-        });
-    }
-
-    public OperationalJob computeIfAbsent(UUID key, Function<UUID, 
OperationalJob> mappingFunction)
-    {
-        return map.computeIfAbsent(key, mappingFunction);
-    }
+    /**
+     * Retrieve a job by its ID, or compute and store it if absent.
+     * <p>
+     * The mapping function is only called if the job does not exist.
+     * This ensures that job creation and scheduling logic is only executed 
once.
+     *
+     * @param jobId the job identifier
+     * @param mappingFunction function to create the job if absent
+     * @return the job (either existing or newly created)
+     */
+    OperationalJob computeIfAbsent(UUID jobId, Function<UUID, OperationalJob> 
mappingFunction);
 
-    public OperationalJob get(UUID key)
-    {
-        return map.get(key);
-    }
+    /**
+     * Retrieve a job by its ID.
+     *
+     * @param jobId the job identifier
+     * @return the job, or null if not found
+     */
+    OperationalJob get(UUID jobId);
 
     /**
-     * Returns an immutable copy of the underlying map, to provide a 
consistent view of the map, minimizing contention
+     * Returns an immutable view of all tracked jobs.
+     * <p>
+     * This provides a consistent snapshot of the jobs being tracked,
+     * minimizing contention with concurrent operations.
      *
-     * @return an immutable copy of the underlying mapping
+     * @return an immutable map of job IDs to jobs
      */
     @NotNull
-    Map<UUID, OperationalJob> jobsView()
-    {
-        return Collections.unmodifiableMap(map);
-    }
+    Map<UUID, OperationalJob> jobsView();
 
     /**
-     * Filters the inflight (created or running) jobs matching the job name 
from the jobsView
-     * @return list of inflight jobs being tracked
+     * Filters inflight (CREATED or RUNNING) jobs matching the operation name.
+     *
+     * @param operation the operation name to filter by
+     * @return list of inflight jobs for the operation
      */
     @NotNull
-    List<OperationalJob> inflightJobsByOperation(String operation)
-    {
-        return jobsView().values()
-                         .stream()
-                         .filter(j -> (j.name().equals(operation)) &&
-                                      (j.status() == 
OperationalJobStatus.RUNNING ||
-                                       j.status() == 
OperationalJobStatus.CREATED))
-                         .collect(Collectors.toList());
-    }
-
-    @VisibleForTesting
-    OperationalJob put(OperationalJob job)
-    {
-        return map.put(job.jobId(), job);
-    }
-
-    @VisibleForTesting
-    int size()
-    {
-        return map.size();
-    }
+    List<OperationalJob> inflightJobsByOperation(String operation);
 }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java
index cc315449f..a2224e5ca 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java
@@ -61,6 +61,8 @@ import 
org.apache.cassandra.sidecar.handlers.TokenRangeReplicaMapHandler;
 import org.apache.cassandra.sidecar.handlers.cassandra.NodeSettingsHandler;
 import 
org.apache.cassandra.sidecar.handlers.v2.cassandra.V2NodeSettingsHandler;
 import 
org.apache.cassandra.sidecar.handlers.validations.ValidateTableExistenceHandler;
+import org.apache.cassandra.sidecar.job.InMemoryOperationalJobTracker;
+import org.apache.cassandra.sidecar.job.OperationalJobTracker;
 import org.apache.cassandra.sidecar.modules.multibindings.KeyClassMapKey;
 import org.apache.cassandra.sidecar.modules.multibindings.TableSchemaMapKeys;
 import org.apache.cassandra.sidecar.modules.multibindings.VertxRouteMapKeys;
@@ -77,6 +79,12 @@ import 
org.eclipse.microprofile.openapi.annotations.responses.APIResponse;
 @Path("/")
 public class CassandraOperationsModule extends AbstractModule
 {
+    @Override
+    protected void configure()
+    {
+        
bind(OperationalJobTracker.class).to(InMemoryOperationalJobTracker.class);
+    }
+
     @ProvidesIntoMap
     @KeyClassMapKey(TableSchemaMapKeys.SystemViewsClientsSchemaKey.class)
     TableSchema systemViewsClientsSchema()
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTrackerTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/job/InMemoryOperationalJobTrackerTest.java
similarity index 60%
rename from 
server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTrackerTest.java
rename to 
server/src/test/java/org/apache/cassandra/sidecar/job/InMemoryOperationalJobTrackerTest.java
index 1840ab979..cf5e5f590 100644
--- 
a/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTrackerTest.java
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/job/InMemoryOperationalJobTrackerTest.java
@@ -32,17 +32,19 @@ import org.junit.jupiter.api.Test;
 
 import com.datastax.driver.core.utils.UUIDs;
 
+import static 
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.CREATED;
+import static 
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.RUNNING;
 import static 
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.SUCCEEDED;
 import static 
org.apache.cassandra.sidecar.job.OperationalJobTest.createOperationalJob;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /**
- * Tests to validate job tracking
+ * Tests to validate InMemoryOperationalJobTracker implementation
  */
-class OperationalJobTrackerTest
+class InMemoryOperationalJobTrackerTest
 {
-    private OperationalJobTracker jobTracker;
+    private InMemoryOperationalJobTracker jobTracker;
     private static final int trackerSize = 3;
 
     OperationalJob job1 = createOperationalJob(SUCCEEDED);
@@ -56,7 +58,7 @@ class OperationalJobTrackerTest
     @BeforeEach
     void setUp()
     {
-        jobTracker = new OperationalJobTracker(trackerSize);
+        jobTracker = new InMemoryOperationalJobTracker(trackerSize);
     }
 
     @Test
@@ -127,8 +129,8 @@ class OperationalJobTrackerTest
     void testConcurrentAccess() throws Exception
     {
         int one = 1;
-        long pastTimestamp = System.currentTimeMillis() - 
OperationalJobTracker.ONE_DAY_TTL - 1000L;
-        OperationalJobTracker tracker = new OperationalJobTracker(one);
+        long pastTimestamp = System.currentTimeMillis() - 
InMemoryOperationalJobTracker.ONE_DAY_TTL - 1000L;
+        InMemoryOperationalJobTracker tracker = new 
InMemoryOperationalJobTracker(one);
         ExecutorService executorService = 
Executors.newFixedThreadPool(trackerSize);
         List<OperationalJob> sortedJobs = IntStream.range(0, trackerSize + 10)
                                                    .boxed()
@@ -142,4 +144,89 @@ class OperationalJobTrackerTest
         .describedAs("Only the last job is kept")
         .isSameAs(sortedJobs.get(sortedJobs.size() - 1));
     }
+
+    @Test
+    void testInflightJobsByOperationIncludesCreatedJobs()
+    {
+        OperationalJob createdJob = createOperationalJob(CREATED);
+        jobTracker.put(createdJob);
+
+        List<OperationalJob> inflightJobs = 
jobTracker.inflightJobsByOperation(createdJob.name());
+
+        assertThat(inflightJobs)
+            .hasSize(1)
+            .containsExactly(createdJob);
+    }
+
+    @Test
+    void testInflightJobsByOperationIncludesRunningJobs()
+    {
+        OperationalJob runningJob = createOperationalJob(RUNNING);
+        jobTracker.put(runningJob);
+
+        List<OperationalJob> inflightJobs = 
jobTracker.inflightJobsByOperation(runningJob.name());
+
+        assertThat(inflightJobs)
+            .hasSize(1)
+            .containsExactly(runningJob);
+    }
+
+    @Test
+    void testInflightJobsByOperationExcludesCompletedJobs()
+    {
+        jobTracker.put(job1); // SUCCEEDED -- should be filtered out
+
+        List<OperationalJob> inflightJobs = 
jobTracker.inflightJobsByOperation(job1.name());
+
+        assertThat(inflightJobs)
+            .describedAs("SUCCEEDED jobs should not be considered inflight")
+            .isEmpty();
+    }
+
+    @Test
+    void testInflightJobsByOperationFiltersByOperationName()
+    {
+        OperationalJob decommissionJob = createOperationalJob("decommission", 
RUNNING);
+        OperationalJob drainJob = createOperationalJob("drain", RUNNING);
+
+        jobTracker.put(decommissionJob);
+        jobTracker.put(drainJob);
+
+        List<OperationalJob> decommissionJobs = 
jobTracker.inflightJobsByOperation(decommissionJob.name());
+        List<OperationalJob> drainJobs = 
jobTracker.inflightJobsByOperation(drainJob.name());
+
+        assertThat(decommissionJobs)
+            .hasSize(1)
+            .containsExactly(decommissionJob);
+        assertThat(drainJobs)
+            .hasSize(1)
+            .containsExactly(drainJob);
+    }
+
+    @Test
+    void testInflightJobsByOperationReturnsMultipleJobsOfSameType()
+    {
+        OperationalJob runningJob = createOperationalJob(RUNNING);
+        OperationalJob createdJob = createOperationalJob(CREATED);
+
+        jobTracker.put(runningJob);
+        jobTracker.put(createdJob);
+
+        List<OperationalJob> inflightJobs = 
jobTracker.inflightJobsByOperation(runningJob.name());
+
+        assertThat(inflightJobs)
+            .describedAs("Multiple jobs of same operation type should be 
allowed")
+            .hasSize(2)
+            .containsExactlyInAnyOrder(runningJob, createdJob);
+    }
+
+    @Test
+    void testInflightJobsByOperationReturnsEmptyListForUnknownOperation()
+    {
+        jobTracker.put(job1);
+
+        List<OperationalJob> inflightJobs = 
jobTracker.inflightJobsByOperation("unknown-operation");
+
+        assertThat(inflightJobs).isEmpty();
+    }
 }
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobManagerTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobManagerTest.java
index 1b062f8a6..4007b6612 100644
--- 
a/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobManagerTest.java
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobManagerTest.java
@@ -72,7 +72,7 @@ class OperationalJobManagerTest
     @Test
     void testWithNoDownstreamJob() throws InterruptedException
     {
-        OperationalJobTracker tracker = new OperationalJobTracker(4);
+        OperationalJobTracker tracker = new InMemoryOperationalJobTracker(4);
         OperationalJobManager manager = new OperationalJobManager(tracker, 
executorPool);
         CountDownLatch latch = new CountDownLatch(1);
 
@@ -93,7 +93,7 @@ class OperationalJobManagerTest
     void testWithRunningDownstreamJob() throws InterruptedException
     {
         OperationalJob runningJob = 
OperationalJobTest.createOperationalJob(RUNNING);
-        OperationalJobTracker tracker = new OperationalJobTracker(4);
+        OperationalJobTracker tracker = new InMemoryOperationalJobTracker(4);
         OperationalJobManager manager = new OperationalJobManager(tracker, 
executorPool);
         CountDownLatch latch = new CountDownLatch(1);
 
@@ -111,7 +111,7 @@ class OperationalJobManagerTest
     void testWithLongRunningJob() throws InterruptedException
     {
         UUID jobId = UUIDs.timeBased();
-        OperationalJobTracker tracker = new OperationalJobTracker(4);
+        OperationalJobTracker tracker = new InMemoryOperationalJobTracker(4);
         OperationalJobManager manager = new OperationalJobManager(tracker, 
executorPool);
         CountDownLatch latch = new CountDownLatch(1);
 
@@ -135,7 +135,7 @@ class OperationalJobManagerTest
     void testWithFailingJob() throws InterruptedException
     {
         UUID jobId = UUIDs.timeBased();
-        OperationalJobTracker tracker = new OperationalJobTracker(4);
+        OperationalJobTracker tracker = new InMemoryOperationalJobTracker(4);
         OperationalJobManager manager = new OperationalJobManager(tracker, 
executorPool);
         CountDownLatch latch = new CountDownLatch(1);
 
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTest.java 
b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTest.java
index fedd3c391..f3f495b7c 100644
--- 
a/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTest.java
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTest.java
@@ -56,6 +56,36 @@ class OperationalJobTest
         return createOperationalJob(UUIDs.timeBased(), jobStatus);
     }
 
+    public static OperationalJob createOperationalJob(String name, 
OperationalJobStatus jobStatus)
+    {
+        return new OperationalJob(UUIDs.timeBased())
+        {
+            @Override
+            protected Future<Void> executeInternal() throws 
OperationalJobException
+            {
+                return Future.succeededFuture();
+            }
+
+            @Override
+            public boolean hasConflict(List<OperationalJob> jobs)
+            {
+                return jobStatus == OperationalJobStatus.RUNNING;
+            }
+
+            @Override
+            public OperationalJobStatus status()
+            {
+                return jobStatus;
+            }
+
+            @Override
+            public String name()
+            {
+                return name;
+            }
+        };
+    }
+
     public static OperationalJob createOperationalJob(UUID jobId, 
OperationalJobStatus jobStatus)
     {
         return new OperationalJob(jobId)
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/job/RepairJobTest.java 
b/server/src/test/java/org/apache/cassandra/sidecar/job/RepairJobTest.java
index 12d05e2d3..1eb46601a 100644
--- a/server/src/test/java/org/apache/cassandra/sidecar/job/RepairJobTest.java
+++ b/server/src/test/java/org/apache/cassandra/sidecar/job/RepairJobTest.java
@@ -233,7 +233,7 @@ class RepairJobTest
     void testMultipleRepairJobsRunningInParallel() throws Exception
     {
         // Create a job tracker and manager
-        OperationalJobTracker tracker = new OperationalJobTracker(10);
+        OperationalJobTracker tracker = new InMemoryOperationalJobTracker(10);
         OperationalJobManager manager = new OperationalJobManager(tracker, 
executorPool);
 
         // Mock the storage operations


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to