This is an automated email from the ASF dual-hosted git repository.
frankgh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push:
new 402367a5a CASSSIDECAR-372: Define common operational job tracking
interface and refactor current operational job tracker (#325)
402367a5a is described below
commit 402367a5aedbdb902d1aeb4d56b493a7176893c4
Author: Andrés Beck-Ruiz <[email protected]>
AuthorDate: Fri Mar 13 20:35:06 2026 -0400
CASSSIDECAR-372: Define common operational job tracking interface and
refactor current operational job tracker (#325)
Patch by Andrés Beck-Ruiz; reviewed by Francisco Guerrero, Shailaja Koppu,
Arjun Ashok, Yifan Cai for CASSSIDECAR-372
---
CHANGES.txt | 1 +
...ker.java => InMemoryOperationalJobTracker.java} | 19 +--
.../sidecar/job/OperationalJobTracker.java | 129 +++++----------------
.../sidecar/modules/CassandraOperationsModule.java | 8 ++
...java => InMemoryOperationalJobTrackerTest.java} | 99 +++++++++++++++-
.../sidecar/job/OperationalJobManagerTest.java | 8 +-
.../cassandra/sidecar/job/OperationalJobTest.java | 30 +++++
.../cassandra/sidecar/job/RepairJobTest.java | 2 +-
8 files changed, 181 insertions(+), 115 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index fc9094135..8df6ee722 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,7 @@
0.3.0
-----
+ * Define common operational job tracking interface and refactor current
operational job tracker (CASSSIDECAR-372)
* RangeManager should be singleton in CDCModule (CASSSIDECAR-411)
* CDC: Add end-to-end CDC integration tests (CASSSIDECAR-308)
* SchemaStorePublisherFactory should be Injectable in CachingSchemaStore
(CASSSIDECAR-408)
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobTracker.java
b/server/src/main/java/org/apache/cassandra/sidecar/job/InMemoryOperationalJobTracker.java
similarity index 88%
copy from
server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobTracker.java
copy to
server/src/main/java/org/apache/cassandra/sidecar/job/InMemoryOperationalJobTracker.java
index 6d87510b3..f37bc7690 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobTracker.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/job/InMemoryOperationalJobTracker.java
@@ -38,23 +38,24 @@ import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.VisibleForTesting;
/**
- * Tracks and stores the results of long-running jobs running on the sidecar
+ * Local in-memory implementation of {@link OperationalJobTracker}.
+ * Stores jobs in a synchronized LinkedHashMap with TTL-based eviction.
*/
@Singleton
-public class OperationalJobTracker
+public class InMemoryOperationalJobTracker implements OperationalJobTracker
{
public static final long ONE_DAY_TTL = TimeUnit.DAYS.toMillis(1); // todo:
consider making it configurable
- private static final Logger LOGGER =
LoggerFactory.getLogger(OperationalJobTracker.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(InMemoryOperationalJobTracker.class);
private final Map<UUID, OperationalJob> map;
@Inject
- public OperationalJobTracker(ServiceConfiguration serviceConfiguration)
+ public InMemoryOperationalJobTracker(ServiceConfiguration
serviceConfiguration)
{
this(serviceConfiguration.operationalJobTrackerSize());
}
- public OperationalJobTracker(int initialCapacity)
+ public InMemoryOperationalJobTracker(int initialCapacity)
{
map = Collections.synchronizedMap(new LinkedHashMap<UUID,
OperationalJob>(initialCapacity)
{
@@ -88,11 +89,13 @@ public class OperationalJobTracker
});
}
+ @Override
public OperationalJob computeIfAbsent(UUID key, Function<UUID,
OperationalJob> mappingFunction)
{
return map.computeIfAbsent(key, mappingFunction);
}
+ @Override
public OperationalJob get(UUID key)
{
return map.get(key);
@@ -103,8 +106,9 @@ public class OperationalJobTracker
*
* @return an immutable copy of the underlying mapping
*/
+ @Override
@NotNull
- Map<UUID, OperationalJob> jobsView()
+ public Map<UUID, OperationalJob> jobsView()
{
return Collections.unmodifiableMap(map);
}
@@ -113,8 +117,9 @@ public class OperationalJobTracker
* Filters the inflight (created or running) jobs matching the job name
from the jobsView
* @return list of inflight jobs being tracked
*/
+ @Override
@NotNull
- List<OperationalJob> inflightJobsByOperation(String operation)
+ public List<OperationalJob> inflightJobsByOperation(String operation)
{
return jobsView().values()
.stream()
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobTracker.java
b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobTracker.java
index 6d87510b3..4d5369542 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobTracker.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobTracker.java
@@ -18,121 +18,56 @@
package org.apache.cassandra.sidecar.job;
-import java.util.Collections;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.TimeUnit;
import java.util.function.Function;
-import java.util.stream.Collectors;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
-import org.apache.cassandra.sidecar.config.ServiceConfiguration;
import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.VisibleForTesting;
/**
- * Tracks and stores the results of long-running jobs running on the sidecar
+ * Tracks and stores the results of long-running jobs running on the sidecar.
+ * Implementations can use different storage backends (in-memory, persistent,
etc.)
*/
-@Singleton
-public class OperationalJobTracker
+public interface OperationalJobTracker
{
- public static final long ONE_DAY_TTL = TimeUnit.DAYS.toMillis(1); // todo:
consider making it configurable
-
- private static final Logger LOGGER =
LoggerFactory.getLogger(OperationalJobTracker.class);
- private final Map<UUID, OperationalJob> map;
-
- @Inject
- public OperationalJobTracker(ServiceConfiguration serviceConfiguration)
- {
- this(serviceConfiguration.operationalJobTrackerSize());
- }
-
- public OperationalJobTracker(int initialCapacity)
- {
- map = Collections.synchronizedMap(new LinkedHashMap<UUID,
OperationalJob>(initialCapacity)
- {
- /**
- * {@inheritDoc}
- */
- @Override
- protected boolean removeEldestEntry(Map.Entry<UUID,
OperationalJob> eldest)
- {
- // We have reached capacity and the oldest entry is either
ready for cleanup or stale
- if (map.size() > initialCapacity)
- {
- OperationalJob job = eldest.getValue();
- OperationalJobStatus status = job.status();
- if (status.isCompleted() &&
job.isStale(System.currentTimeMillis(), ONE_DAY_TTL))
- {
- LOGGER.debug("Expiring completed and stale job due to
job tracker has reached max size. jobId={} status={} createdAt={}",
- job.jobId(), status, job.creationTime());
- return true;
- }
- else
- {
- LOGGER.warn("Job tracker reached max size, but the
eldest job is not completed yet. " +
- "Not evicting. jobId={} status={}",
job.jobId(), status);
- // TODO: Optionally trigger cleanup to fetch next
oldest to evict
- }
- }
-
- return false;
- }
- });
- }
-
- public OperationalJob computeIfAbsent(UUID key, Function<UUID,
OperationalJob> mappingFunction)
- {
- return map.computeIfAbsent(key, mappingFunction);
- }
+ /**
+ * Retrieve a job by its ID, or compute and store it if absent.
+ * <p>
+ * The mapping function is only called if the job does not exist.
+ * This ensures that job creation and scheduling logic is only executed
once.
+ *
+ * @param jobId the job identifier
+ * @param mappingFunction function to create the job if absent
+ * @return the job (either existing or newly created)
+ */
+ OperationalJob computeIfAbsent(UUID jobId, Function<UUID, OperationalJob>
mappingFunction);
- public OperationalJob get(UUID key)
- {
- return map.get(key);
- }
+ /**
+ * Retrieve a job by its ID.
+ *
+ * @param jobId the job identifier
+ * @return the job, or null if not found
+ */
+ OperationalJob get(UUID jobId);
/**
- * Returns an immutable copy of the underlying map, to provide a
consistent view of the map, minimizing contention
+ * Returns an immutable view of all tracked jobs.
+ * <p>
+ * This provides a consistent snapshot of the jobs being tracked,
+ * minimizing contention with concurrent operations.
*
- * @return an immutable copy of the underlying mapping
+ * @return an immutable map of job IDs to jobs
*/
@NotNull
- Map<UUID, OperationalJob> jobsView()
- {
- return Collections.unmodifiableMap(map);
- }
+ Map<UUID, OperationalJob> jobsView();
/**
- * Filters the inflight (created or running) jobs matching the job name
from the jobsView
- * @return list of inflight jobs being tracked
+ * Filters inflight (CREATED or RUNNING) jobs matching the operation name.
+ *
+ * @param operation the operation name to filter by
+ * @return list of inflight jobs for the operation
*/
@NotNull
- List<OperationalJob> inflightJobsByOperation(String operation)
- {
- return jobsView().values()
- .stream()
- .filter(j -> (j.name().equals(operation)) &&
- (j.status() ==
OperationalJobStatus.RUNNING ||
- j.status() ==
OperationalJobStatus.CREATED))
- .collect(Collectors.toList());
- }
-
- @VisibleForTesting
- OperationalJob put(OperationalJob job)
- {
- return map.put(job.jobId(), job);
- }
-
- @VisibleForTesting
- int size()
- {
- return map.size();
- }
+ List<OperationalJob> inflightJobsByOperation(String operation);
}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java
b/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java
index cc315449f..a2224e5ca 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java
@@ -61,6 +61,8 @@ import
org.apache.cassandra.sidecar.handlers.TokenRangeReplicaMapHandler;
import org.apache.cassandra.sidecar.handlers.cassandra.NodeSettingsHandler;
import
org.apache.cassandra.sidecar.handlers.v2.cassandra.V2NodeSettingsHandler;
import
org.apache.cassandra.sidecar.handlers.validations.ValidateTableExistenceHandler;
+import org.apache.cassandra.sidecar.job.InMemoryOperationalJobTracker;
+import org.apache.cassandra.sidecar.job.OperationalJobTracker;
import org.apache.cassandra.sidecar.modules.multibindings.KeyClassMapKey;
import org.apache.cassandra.sidecar.modules.multibindings.TableSchemaMapKeys;
import org.apache.cassandra.sidecar.modules.multibindings.VertxRouteMapKeys;
@@ -77,6 +79,12 @@ import
org.eclipse.microprofile.openapi.annotations.responses.APIResponse;
@Path("/")
public class CassandraOperationsModule extends AbstractModule
{
+ @Override
+ protected void configure()
+ {
+
bind(OperationalJobTracker.class).to(InMemoryOperationalJobTracker.class);
+ }
+
@ProvidesIntoMap
@KeyClassMapKey(TableSchemaMapKeys.SystemViewsClientsSchemaKey.class)
TableSchema systemViewsClientsSchema()
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTrackerTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/job/InMemoryOperationalJobTrackerTest.java
similarity index 60%
rename from
server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTrackerTest.java
rename to
server/src/test/java/org/apache/cassandra/sidecar/job/InMemoryOperationalJobTrackerTest.java
index 1840ab979..cf5e5f590 100644
---
a/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTrackerTest.java
+++
b/server/src/test/java/org/apache/cassandra/sidecar/job/InMemoryOperationalJobTrackerTest.java
@@ -32,17 +32,19 @@ import org.junit.jupiter.api.Test;
import com.datastax.driver.core.utils.UUIDs;
+import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.CREATED;
+import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.RUNNING;
import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.SUCCEEDED;
import static
org.apache.cassandra.sidecar.job.OperationalJobTest.createOperationalJob;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/**
- * Tests to validate job tracking
+ * Tests to validate InMemoryOperationalJobTracker implementation
*/
-class OperationalJobTrackerTest
+class InMemoryOperationalJobTrackerTest
{
- private OperationalJobTracker jobTracker;
+ private InMemoryOperationalJobTracker jobTracker;
private static final int trackerSize = 3;
OperationalJob job1 = createOperationalJob(SUCCEEDED);
@@ -56,7 +58,7 @@ class OperationalJobTrackerTest
@BeforeEach
void setUp()
{
- jobTracker = new OperationalJobTracker(trackerSize);
+ jobTracker = new InMemoryOperationalJobTracker(trackerSize);
}
@Test
@@ -127,8 +129,8 @@ class OperationalJobTrackerTest
void testConcurrentAccess() throws Exception
{
int one = 1;
- long pastTimestamp = System.currentTimeMillis() -
OperationalJobTracker.ONE_DAY_TTL - 1000L;
- OperationalJobTracker tracker = new OperationalJobTracker(one);
+ long pastTimestamp = System.currentTimeMillis() -
InMemoryOperationalJobTracker.ONE_DAY_TTL - 1000L;
+ InMemoryOperationalJobTracker tracker = new
InMemoryOperationalJobTracker(one);
ExecutorService executorService =
Executors.newFixedThreadPool(trackerSize);
List<OperationalJob> sortedJobs = IntStream.range(0, trackerSize + 10)
.boxed()
@@ -142,4 +144,89 @@ class OperationalJobTrackerTest
.describedAs("Only the last job is kept")
.isSameAs(sortedJobs.get(sortedJobs.size() - 1));
}
+
+ @Test
+ void testInflightJobsByOperationIncludesCreatedJobs()
+ {
+ OperationalJob createdJob = createOperationalJob(CREATED);
+ jobTracker.put(createdJob);
+
+ List<OperationalJob> inflightJobs =
jobTracker.inflightJobsByOperation(createdJob.name());
+
+ assertThat(inflightJobs)
+ .hasSize(1)
+ .containsExactly(createdJob);
+ }
+
+ @Test
+ void testInflightJobsByOperationIncludesRunningJobs()
+ {
+ OperationalJob runningJob = createOperationalJob(RUNNING);
+ jobTracker.put(runningJob);
+
+ List<OperationalJob> inflightJobs =
jobTracker.inflightJobsByOperation(runningJob.name());
+
+ assertThat(inflightJobs)
+ .hasSize(1)
+ .containsExactly(runningJob);
+ }
+
+ @Test
+ void testInflightJobsByOperationExcludesCompletedJobs()
+ {
+ jobTracker.put(job1); // SUCCEEDED -- should be filtered out
+
+ List<OperationalJob> inflightJobs =
jobTracker.inflightJobsByOperation(job1.name());
+
+ assertThat(inflightJobs)
+ .describedAs("SUCCEEDED jobs should not be considered inflight")
+ .isEmpty();
+ }
+
+ @Test
+ void testInflightJobsByOperationFiltersByOperationName()
+ {
+ OperationalJob decommissionJob = createOperationalJob("decommission",
RUNNING);
+ OperationalJob drainJob = createOperationalJob("drain", RUNNING);
+
+ jobTracker.put(decommissionJob);
+ jobTracker.put(drainJob);
+
+ List<OperationalJob> decommissionJobs =
jobTracker.inflightJobsByOperation(decommissionJob.name());
+ List<OperationalJob> drainJobs =
jobTracker.inflightJobsByOperation(drainJob.name());
+
+ assertThat(decommissionJobs)
+ .hasSize(1)
+ .containsExactly(decommissionJob);
+ assertThat(drainJobs)
+ .hasSize(1)
+ .containsExactly(drainJob);
+ }
+
+ @Test
+ void testInflightJobsByOperationReturnsMultipleJobsOfSameType()
+ {
+ OperationalJob runningJob = createOperationalJob(RUNNING);
+ OperationalJob createdJob = createOperationalJob(CREATED);
+
+ jobTracker.put(runningJob);
+ jobTracker.put(createdJob);
+
+ List<OperationalJob> inflightJobs =
jobTracker.inflightJobsByOperation(runningJob.name());
+
+ assertThat(inflightJobs)
+ .describedAs("Multiple jobs of same operation type should be
allowed")
+ .hasSize(2)
+ .containsExactlyInAnyOrder(runningJob, createdJob);
+ }
+
+ @Test
+ void testInflightJobsByOperationReturnsEmptyListForUnknownOperation()
+ {
+ jobTracker.put(job1);
+
+ List<OperationalJob> inflightJobs =
jobTracker.inflightJobsByOperation("unknown-operation");
+
+ assertThat(inflightJobs).isEmpty();
+ }
}
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobManagerTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobManagerTest.java
index 1b062f8a6..4007b6612 100644
---
a/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobManagerTest.java
+++
b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobManagerTest.java
@@ -72,7 +72,7 @@ class OperationalJobManagerTest
@Test
void testWithNoDownstreamJob() throws InterruptedException
{
- OperationalJobTracker tracker = new OperationalJobTracker(4);
+ OperationalJobTracker tracker = new InMemoryOperationalJobTracker(4);
OperationalJobManager manager = new OperationalJobManager(tracker,
executorPool);
CountDownLatch latch = new CountDownLatch(1);
@@ -93,7 +93,7 @@ class OperationalJobManagerTest
void testWithRunningDownstreamJob() throws InterruptedException
{
OperationalJob runningJob =
OperationalJobTest.createOperationalJob(RUNNING);
- OperationalJobTracker tracker = new OperationalJobTracker(4);
+ OperationalJobTracker tracker = new InMemoryOperationalJobTracker(4);
OperationalJobManager manager = new OperationalJobManager(tracker,
executorPool);
CountDownLatch latch = new CountDownLatch(1);
@@ -111,7 +111,7 @@ class OperationalJobManagerTest
void testWithLongRunningJob() throws InterruptedException
{
UUID jobId = UUIDs.timeBased();
- OperationalJobTracker tracker = new OperationalJobTracker(4);
+ OperationalJobTracker tracker = new InMemoryOperationalJobTracker(4);
OperationalJobManager manager = new OperationalJobManager(tracker,
executorPool);
CountDownLatch latch = new CountDownLatch(1);
@@ -135,7 +135,7 @@ class OperationalJobManagerTest
void testWithFailingJob() throws InterruptedException
{
UUID jobId = UUIDs.timeBased();
- OperationalJobTracker tracker = new OperationalJobTracker(4);
+ OperationalJobTracker tracker = new InMemoryOperationalJobTracker(4);
OperationalJobManager manager = new OperationalJobManager(tracker,
executorPool);
CountDownLatch latch = new CountDownLatch(1);
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTest.java
index fedd3c391..f3f495b7c 100644
---
a/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTest.java
+++
b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTest.java
@@ -56,6 +56,36 @@ class OperationalJobTest
return createOperationalJob(UUIDs.timeBased(), jobStatus);
}
+ public static OperationalJob createOperationalJob(String name,
OperationalJobStatus jobStatus)
+ {
+ return new OperationalJob(UUIDs.timeBased())
+ {
+ @Override
+ protected Future<Void> executeInternal() throws
OperationalJobException
+ {
+ return Future.succeededFuture();
+ }
+
+ @Override
+ public boolean hasConflict(List<OperationalJob> jobs)
+ {
+ return jobStatus == OperationalJobStatus.RUNNING;
+ }
+
+ @Override
+ public OperationalJobStatus status()
+ {
+ return jobStatus;
+ }
+
+ @Override
+ public String name()
+ {
+ return name;
+ }
+ };
+ }
+
public static OperationalJob createOperationalJob(UUID jobId,
OperationalJobStatus jobStatus)
{
return new OperationalJob(jobId)
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/job/RepairJobTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/job/RepairJobTest.java
index 12d05e2d3..1eb46601a 100644
--- a/server/src/test/java/org/apache/cassandra/sidecar/job/RepairJobTest.java
+++ b/server/src/test/java/org/apache/cassandra/sidecar/job/RepairJobTest.java
@@ -233,7 +233,7 @@ class RepairJobTest
void testMultipleRepairJobsRunningInParallel() throws Exception
{
// Create a job tracker and manager
- OperationalJobTracker tracker = new OperationalJobTracker(10);
+ OperationalJobTracker tracker = new InMemoryOperationalJobTracker(10);
OperationalJobManager manager = new OperationalJobManager(tracker,
executorPool);
// Mock the storage operations
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]