This is an automated email from the ASF dual-hosted git repository.
pauloricardomg pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3ce96e60 CASSSIDECAR-460: Refactor OperationalJob to have data
separate from execution logic (#349)
3ce96e60 is described below
commit 3ce96e60705d40f9cb914fbc37df87fe45e110ce
Author: Andrés Beck-Ruiz <[email protected]>
AuthorDate: Thu May 21 14:20:15 2026 -0400
CASSSIDECAR-460: Refactor OperationalJob to have data separate from
execution logic (#349)
Patch by Andrés Beck-Ruiz; Reviewed by Arjun Ashok, Francisco Guerrero,
Isaac Reath, Saranya Krishnakumar for CASSSIDECAR-460
---
CHANGES.txt | 1 +
.../sidecar/handlers/OperationalJobHandler.java | 4 +-
.../sidecar/job/InMemoryOperationalJobTracker.java | 2 +-
.../cassandra/sidecar/job/OperationalJob.java | 60 +++++++--
.../cassandra/sidecar/job/OperationalJobInfo.java | 107 ++++++++++++++++
.../sidecar/job/OperationalJobManager.java | 8 +-
.../sidecar/job/OperationalJobTracker.java | 4 +-
.../sidecar/utils/OperationalJobUtils.java | 8 +-
.../handlers/ListOperationalJobsHandlerTest.java | 3 +-
.../handlers/OperationalJobHandlerTest.java | 3 +-
.../sidecar/job/OperationalJobInfoTest.java | 142 +++++++++++++++++++++
11 files changed, 316 insertions(+), 26 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 82e150ff..c6fb4c35 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
0.4.0
-----
+ * Refactor OperationalJob to have data separate from execution logic
(CASSSIDECAR-460)
* Sidecar’s CassandraBridgeFactory FQCN colliding with the Cassandra
analytics class (CASSSIDECAR-467)
* Fix ON_CDC_CACHE_WARMED_UP not fired when schema publisher fails
(CASSSIDECAR-459)
* Add SidecarReplicationFactorSupplier for live replication factor lookups
(CASSSIDECAR-458)
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/OperationalJobHandler.java
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/OperationalJobHandler.java
index 44925c72..a1367ade 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/OperationalJobHandler.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/OperationalJobHandler.java
@@ -30,7 +30,7 @@ import io.vertx.ext.auth.authorization.Authorization;
import io.vertx.ext.web.RoutingContext;
import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions;
import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
-import org.apache.cassandra.sidecar.job.OperationalJob;
+import org.apache.cassandra.sidecar.job.OperationalJobInfo;
import org.apache.cassandra.sidecar.job.OperationalJobManager;
import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
@@ -75,7 +75,7 @@ public class OperationalJobHandler extends
AbstractHandler<UUID> implements Acce
{
executorPools.service()
.executeBlocking(() -> {
- OperationalJob job = jobManager.getJobIfExists(jobId);
+ OperationalJobInfo job =
jobManager.getJobIfExists(jobId);
if (job == null)
{
logger.info("No operational job found with the
jobId. jobId={}", jobId);
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/job/InMemoryOperationalJobTracker.java
b/server/src/main/java/org/apache/cassandra/sidecar/job/InMemoryOperationalJobTracker.java
index f37bc769..bbdd4777 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/job/InMemoryOperationalJobTracker.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/job/InMemoryOperationalJobTracker.java
@@ -96,7 +96,7 @@ public class InMemoryOperationalJobTracker implements
OperationalJobTracker
}
@Override
- public OperationalJob get(UUID key)
+ public OperationalJobInfo get(UUID key)
{
return map.get(key);
}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java
b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java
index 8fad806f..58320e4e 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java
@@ -42,7 +42,7 @@ import org.jetbrains.annotations.Nullable;
/**
* An abstract class representing operational jobs that run on Cassandra
*/
-public abstract class OperationalJob implements Task<Void>
+public abstract class OperationalJob implements Task<Void>, OperationalJobInfo
{
private static final Logger LOGGER =
LoggerFactory.getLogger(OperationalJob.class);
@@ -88,30 +88,37 @@ public abstract class OperationalJob implements Task<Void>
this.nodesFailed = Collections.emptyList();
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public UUID jobId()
{
return jobId;
}
/**
- * @return the node UUID associated with this job, or {@code null} if not
applicable
+ * {@inheritDoc}
*/
+ @Override
public @Nullable UUID nodeId()
{
return nodeId;
}
/**
- * @return the time this job started execution, or {@code null} if not yet
started
+ * {@inheritDoc}
*/
+ @Override
public @Nullable Instant startTime()
{
return startTime;
}
/**
- * @return the time of the last status update for this job, or {@code
null} if not yet started
+ * {@inheritDoc}
*/
+ @Override
public @Nullable Instant lastUpdate()
{
return lastUpdate;
@@ -128,8 +135,9 @@ public abstract class OperationalJob implements Task<Void>
}
/**
- * @return the list of nodes pending execution for this job
+ * {@inheritDoc}
*/
+ @Override
@NotNull
public List<UUID> nodesPending()
{
@@ -137,8 +145,9 @@ public abstract class OperationalJob implements Task<Void>
}
/**
- * @return the list of nodes currently executing this job
+ * {@inheritDoc}
*/
+ @Override
@NotNull
public List<UUID> nodesExecuting()
{
@@ -146,8 +155,9 @@ public abstract class OperationalJob implements Task<Void>
}
/**
- * @return the list of nodes that have succeeded executing this job
+ * {@inheritDoc}
*/
+ @Override
@NotNull
public List<UUID> nodesSucceeded()
{
@@ -155,8 +165,9 @@ public abstract class OperationalJob implements Task<Void>
}
/**
- * @return the list of nodes that have failed executing this job
+ * {@inheritDoc}
*/
+ @Override
@NotNull
public List<UUID> nodesFailed()
{
@@ -179,16 +190,18 @@ public abstract class OperationalJob implements Task<Void>
}
/**
- * @return unix timestamp of the job creation time in milliseconds
+ * {@inheritDoc}
*/
+ @Override
public long creationTime()
{
return UUIDs.unixTimestamp(jobId);
}
/**
- * @return whether the operational job is executing or not.
+ * {@inheritDoc}
*/
+ @Override
public boolean isExecuting()
{
return isExecuting;
@@ -216,6 +229,16 @@ public abstract class OperationalJob implements Task<Void>
*/
public abstract boolean hasConflict(@NotNull List<OperationalJob>
sameOperationJobs);
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String name()
+ {
+ String simpleName = this.getClass().getSimpleName();
+ return simpleName.isEmpty() ? this.getClass().getName() : simpleName;
+ }
+
/**
* Determines the status of the job. OperationalJob subclasses could
choose to override the method.
* <p>
@@ -229,6 +252,7 @@ public abstract class OperationalJob implements Task<Void>
*
* @return status of the OperationalJob execution
*/
+ @Override
public OperationalJobStatus status()
{
Future<Void> fut = asyncResult();
@@ -256,6 +280,22 @@ public abstract class OperationalJob implements Task<Void>
return executionPromise.future();
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ @Nullable
+ public String failureReason()
+ {
+ Future<Void> fut = asyncResult();
+ if (fut.isComplete() && fut.failed() && fut.cause() != null)
+ {
+ String message = fut.cause().getMessage();
+ return message != null ? message : String.format("Encountered %s
during job execution", fut.cause().getClass().getName());
+ }
+ return null;
+ }
+
/**
* Get the async result with waiting for at most the specified wait time
* <p>
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobInfo.java
b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobInfo.java
new file mode 100644
index 00000000..1748e3db
--- /dev/null
+++
b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobInfo.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.job;
+
+import java.time.Instant;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * A read-only view of operational job data. Provides state accessors needed
by handlers
+ * and utilities to inspect job status and report results, without exposing
execution logic.
+ */
+public interface OperationalJobInfo
+{
+ /**
+ * @return the unique identifier for the job
+ */
+ UUID jobId();
+
+ /**
+ * @return the node UUID associated with the job, or {@code null} for jobs
spanning multiple nodes
+ */
+ @Nullable
+ UUID nodeId();
+
+ /**
+ * @return the name of the operation the job performs
+ */
+ String name();
+
+ /**
+ * @return the current status of the job
+ */
+ OperationalJobStatus status();
+
+ /**
+ * @return unix timestamp of the job creation time in milliseconds
+ */
+ long creationTime();
+
+ /**
+ * @return the time this job started execution, or {@code null} if not yet
started
+ */
+ @Nullable
+ Instant startTime();
+
+ /**
+ * @return list of node UUIDs pending execution of the job
+ */
+ @NotNull
+ List<UUID> nodesPending();
+
+ /**
+ * @return list of node UUIDs currently executing the job
+ */
+ @NotNull
+ List<UUID> nodesExecuting();
+
+ /**
+ * @return list of node UUIDs that have succeeded executing the job
+ */
+ @NotNull
+ List<UUID> nodesSucceeded();
+
+ /**
+ * @return list of node UUIDs that have failed executing the job
+ */
+ @NotNull
+ List<UUID> nodesFailed();
+
+ /**
+ * @return the time of the last status update, or {@code null} if not yet
started
+ */
+ @Nullable
+ Instant lastUpdate();
+
+ /**
+ * @return whether the job is currently executing
+ */
+ boolean isExecuting();
+
+ /**
+ * @return the failure reason if the job has failed, or {@code null} if
the job is still in progress or succeeded
+ */
+ @Nullable
+ String failureReason();
+}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobManager.java
b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobManager.java
index 0e70e1e6..ccaed374 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobManager.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobManager.java
@@ -61,11 +61,11 @@ public class OperationalJobManager
*
* @return instances of the jobs that are in pending or running states
*/
- public List<OperationalJob> allInflightJobs()
+ public List<OperationalJobInfo> allInflightJobs()
{
return jobTracker.jobsView().values()
.stream()
- .filter(j -> !j.asyncResult().isComplete())
+ .filter(j -> !j.status().isCompleted())
.collect(Collectors.toList());
}
@@ -73,9 +73,9 @@ public class OperationalJobManager
* Fetch the job using its UUID
*
* @param jobId identifier of the job
- * @return instance of the job or null
+ * @return instance of the job info or null
*/
- public OperationalJob getJobIfExists(UUID jobId)
+ public OperationalJobInfo getJobIfExists(UUID jobId)
{
return jobTracker.get(jobId);
}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobTracker.java
b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobTracker.java
index 4d536954..5dcb4283 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobTracker.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobTracker.java
@@ -47,9 +47,9 @@ public interface OperationalJobTracker
* Retrieve a job by its ID.
*
* @param jobId the job identifier
- * @return the job, or null if not found
+ * @return the job info, or null if not found
*/
- OperationalJob get(UUID jobId);
+ OperationalJobInfo get(UUID jobId);
/**
* Returns an immutable view of all tracked jobs.
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/utils/OperationalJobUtils.java
b/server/src/main/java/org/apache/cassandra/sidecar/utils/OperationalJobUtils.java
index 52d43262..54c45cd9 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/utils/OperationalJobUtils.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/utils/OperationalJobUtils.java
@@ -26,7 +26,7 @@ import io.vertx.ext.web.RoutingContext;
import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
import org.apache.cassandra.sidecar.exceptions.OperationalJobConflictException;
-import org.apache.cassandra.sidecar.job.OperationalJob;
+import org.apache.cassandra.sidecar.job.OperationalJobInfo;
import static
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.FAILED;
@@ -47,7 +47,7 @@ public class OperationalJobUtils
* @param job the operational job to report status on
* @param exception the conflict exception, if any (null if no conflict)
*/
- public static void sendStatusBasedResponse(RoutingContext context,
OperationalJob job, OperationalJobConflictException exception)
+ public static void sendStatusBasedResponse(RoutingContext context,
OperationalJobInfo job, OperationalJobConflictException exception)
{
if (exception != null)
{
@@ -81,9 +81,9 @@ public class OperationalJobUtils
}
String reason = null;
- if (status == FAILED && job.asyncResult() != null &&
job.asyncResult().cause() != null)
+ if (status == FAILED)
{
- reason = job.asyncResult().cause().getMessage();
+ reason = job.failureReason();
}
context.json(OperationalJobResponse.builder()
.jobId(job.jobId())
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/handlers/ListOperationalJobsHandlerTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/ListOperationalJobsHandlerTest.java
index 5a36bfde..91159bd0 100644
---
a/server/src/test/java/org/apache/cassandra/sidecar/handlers/ListOperationalJobsHandlerTest.java
+++
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/ListOperationalJobsHandlerTest.java
@@ -50,6 +50,7 @@ import org.apache.cassandra.sidecar.TestModule;
import
org.apache.cassandra.sidecar.common.response.ListOperationalJobsResponse;
import
org.apache.cassandra.sidecar.common.server.exceptions.OperationalJobException;
import org.apache.cassandra.sidecar.job.OperationalJob;
+import org.apache.cassandra.sidecar.job.OperationalJobInfo;
import org.apache.cassandra.sidecar.job.OperationalJobManager;
import org.apache.cassandra.sidecar.modules.SidecarModules;
import org.apache.cassandra.sidecar.server.Server;
@@ -128,7 +129,7 @@ class ListOperationalJobsHandlerTest
@Singleton
public OperationalJobManager jobManager()
{
- List<OperationalJob> testJobs = Arrays.asList(running, running2);
+ List<OperationalJobInfo> testJobs = Arrays.asList(running,
running2);
OperationalJobManager mockManager =
mock(OperationalJobManager.class);
when(mockManager.allInflightJobs()).thenReturn(testJobs);
return mockManager;
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/handlers/OperationalJobHandlerTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/OperationalJobHandlerTest.java
index 23efcfc5..8b4f2ef1 100644
---
a/server/src/test/java/org/apache/cassandra/sidecar/handlers/OperationalJobHandlerTest.java
+++
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/OperationalJobHandlerTest.java
@@ -37,7 +37,6 @@ import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import com.google.inject.util.Modules;
-import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.ext.web.client.WebClient;
@@ -183,7 +182,7 @@ class OperationalJobHandlerTest
OperationalJob failedMock = mock(OperationalJob.class);
when(failedMock.jobId()).thenReturn(failedUuid);
when(failedMock.status()).thenReturn(OperationalJobStatus.FAILED);
-
when(failedMock.asyncResult()).thenReturn(Future.failedFuture("Test failed"));
+ when(failedMock.failureReason()).thenReturn("Test failed");
when(failedMock.name()).thenReturn("testFailed");
when(mockManager.getJobIfExists(runningUuid)).thenReturn(runningMock);
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobInfoTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobInfoTest.java
new file mode 100644
index 00000000..19b3a9eb
--- /dev/null
+++
b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobInfoTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.job;
+
+import java.util.List;
+import java.util.UUID;
+
+import org.junit.jupiter.api.Test;
+
+import com.datastax.driver.core.utils.UUIDs;
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import
org.apache.cassandra.sidecar.common.server.exceptions.OperationalJobException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests for the {@link OperationalJobInfo} interface contract on {@link
OperationalJob}
+ */
+class OperationalJobInfoTest
+{
+ @Test
+ void testFailureReasonReturnsMessageForFailedJob()
+ {
+ String expectedMessage = "something went wrong";
+ OperationalJob job = createFailingJob(expectedMessage);
+ Promise<Void> promise = Promise.promise();
+ job.execute(promise);
+
+ assertThat(job.failureReason()).isEqualTo(expectedMessage);
+ }
+
+ @Test
+ void testFailureReasonReturnsNullForSucceededJob()
+ {
+ OperationalJob job = createSucceedingJob();
+ Promise<Void> promise = Promise.promise();
+ job.execute(promise);
+
+ assertThat(job.asyncResult().succeeded()).isTrue();
+ assertThat(job.failureReason()).isNull();
+ }
+
+ @Test
+ void testFailureReasonReturnsNullForCreatedJob()
+ {
+ OperationalJob job = createSucceedingJob();
+
+ assertThat(job.asyncResult().isComplete()).isFalse();
+ assertThat(job.failureReason()).isNull();
+ }
+
+ @Test
+ void testNameReturnsFullNameForAnonymousClass()
+ {
+ OperationalJob job = createSucceedingJob();
+
+ assertThat(job.name()).isNotEmpty();
+ assertThat(job.name()).contains("OperationalJobInfoTest");
+ }
+
+ @Test
+ void testNameReturnsSimpleNameForNamedClass()
+ {
+ OperationalJob job = new NamedJob(UUIDs.timeBased());
+
+ assertThat(job.name()).isEqualTo("NamedJob");
+ }
+
+ static class NamedJob extends OperationalJob
+ {
+ NamedJob(UUID jobId)
+ {
+ super(jobId);
+ }
+
+ @Override
+ public boolean hasConflict(List<OperationalJob> sameOperationJobs)
+ {
+ return false;
+ }
+
+ @Override
+ protected Future<Void> executeInternal()
+ {
+ return Future.succeededFuture();
+ }
+ }
+
+ private static OperationalJob createFailingJob(String failureMessage)
+ {
+ return new OperationalJob(UUIDs.timeBased())
+ {
+ @Override
+ public boolean hasConflict(List<OperationalJob> sameOperationJobs)
+ {
+ return false;
+ }
+
+ @Override
+ protected Future<Void> executeInternal() throws
OperationalJobException
+ {
+ throw new OperationalJobException(failureMessage);
+ }
+
+ };
+ }
+
+ private static OperationalJob createSucceedingJob()
+ {
+ return new OperationalJob(UUIDs.timeBased())
+ {
+ @Override
+ public boolean hasConflict(List<OperationalJob> sameOperationJobs)
+ {
+ return false;
+ }
+
+ @Override
+ protected Future<Void> executeInternal()
+ {
+ return Future.succeededFuture();
+ }
+ };
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]