This is an automated email from the ASF dual-hosted git repository.

pauloricardomg pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3ce96e60 CASSSIDECAR-460: Refactor OperationalJob to have data 
separate from execution logic (#349)
3ce96e60 is described below

commit 3ce96e60705d40f9cb914fbc37df87fe45e110ce
Author: Andrés Beck-Ruiz <[email protected]>
AuthorDate: Thu May 21 14:20:15 2026 -0400

    CASSSIDECAR-460: Refactor OperationalJob to have data separate from 
execution logic (#349)
    
    Patch by Andrés Beck-Ruiz; Reviewed by Arjun Ashok, Francisco Guerrero, 
Isaac Reath, Saranya Krishnakumar for CASSSIDECAR-460
---
 CHANGES.txt                                        |   1 +
 .../sidecar/handlers/OperationalJobHandler.java    |   4 +-
 .../sidecar/job/InMemoryOperationalJobTracker.java |   2 +-
 .../cassandra/sidecar/job/OperationalJob.java      |  60 +++++++--
 .../cassandra/sidecar/job/OperationalJobInfo.java  | 107 ++++++++++++++++
 .../sidecar/job/OperationalJobManager.java         |   8 +-
 .../sidecar/job/OperationalJobTracker.java         |   4 +-
 .../sidecar/utils/OperationalJobUtils.java         |   8 +-
 .../handlers/ListOperationalJobsHandlerTest.java   |   3 +-
 .../handlers/OperationalJobHandlerTest.java        |   3 +-
 .../sidecar/job/OperationalJobInfoTest.java        | 142 +++++++++++++++++++++
 11 files changed, 316 insertions(+), 26 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 82e150ff..c6fb4c35 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 0.4.0
 -----
+ * Refactor OperationalJob to have data separate from execution logic 
(CASSSIDECAR-460)
  * Sidecar’s CassandraBridgeFactory FQCN colliding with the Cassandra 
analytics class (CASSSIDECAR-467)
  * Fix ON_CDC_CACHE_WARMED_UP not fired when schema publisher fails 
(CASSSIDECAR-459)
  * Add SidecarReplicationFactorSupplier for live replication factor lookups 
(CASSSIDECAR-458)
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/OperationalJobHandler.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/OperationalJobHandler.java
index 44925c72..a1367ade 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/OperationalJobHandler.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/OperationalJobHandler.java
@@ -30,7 +30,7 @@ import io.vertx.ext.auth.authorization.Authorization;
 import io.vertx.ext.web.RoutingContext;
 import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions;
 import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
-import org.apache.cassandra.sidecar.job.OperationalJob;
+import org.apache.cassandra.sidecar.job.OperationalJobInfo;
 import org.apache.cassandra.sidecar.job.OperationalJobManager;
 import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
 import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
@@ -75,7 +75,7 @@ public class OperationalJobHandler extends 
AbstractHandler<UUID> implements Acce
     {
         executorPools.service()
                      .executeBlocking(() -> {
-                         OperationalJob job = jobManager.getJobIfExists(jobId);
+                         OperationalJobInfo job = 
jobManager.getJobIfExists(jobId);
                          if (job == null)
                          {
                              logger.info("No operational job found with the 
jobId. jobId={}", jobId);
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/job/InMemoryOperationalJobTracker.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/job/InMemoryOperationalJobTracker.java
index f37bc769..bbdd4777 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/job/InMemoryOperationalJobTracker.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/job/InMemoryOperationalJobTracker.java
@@ -96,7 +96,7 @@ public class InMemoryOperationalJobTracker implements 
OperationalJobTracker
     }
 
     @Override
-    public OperationalJob get(UUID key)
+    public OperationalJobInfo get(UUID key)
     {
         return map.get(key);
     }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java 
b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java
index 8fad806f..58320e4e 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java
@@ -42,7 +42,7 @@ import org.jetbrains.annotations.Nullable;
 /**
  * An abstract class representing operational jobs that run on Cassandra
  */
-public abstract class OperationalJob implements Task<Void>
+public abstract class OperationalJob implements Task<Void>, OperationalJobInfo
 {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(OperationalJob.class);
 
@@ -88,30 +88,37 @@ public abstract class OperationalJob implements Task<Void>
         this.nodesFailed = Collections.emptyList();
     }
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public UUID jobId()
     {
         return jobId;
     }
 
     /**
-     * @return the node UUID associated with this job, or {@code null} if not 
applicable
+     * {@inheritDoc}
      */
+    @Override
     public @Nullable UUID nodeId()
     {
         return nodeId;
     }
 
     /**
-     * @return the time this job started execution, or {@code null} if not yet 
started
+     * {@inheritDoc}
      */
+    @Override
     public @Nullable Instant startTime()
     {
         return startTime;
     }
 
     /**
-     * @return the time of the last status update for this job, or {@code 
null} if not yet started
+     * {@inheritDoc}
      */
+    @Override
     public @Nullable Instant lastUpdate()
     {
         return lastUpdate;
@@ -128,8 +135,9 @@ public abstract class OperationalJob implements Task<Void>
     }
 
     /**
-     * @return the list of nodes pending execution for this job
+     * {@inheritDoc}
      */
+    @Override
     @NotNull
     public List<UUID> nodesPending()
     {
@@ -137,8 +145,9 @@ public abstract class OperationalJob implements Task<Void>
     }
 
     /**
-     * @return the list of nodes currently executing this job
+     * {@inheritDoc}
      */
+    @Override
     @NotNull
     public List<UUID> nodesExecuting()
     {
@@ -146,8 +155,9 @@ public abstract class OperationalJob implements Task<Void>
     }
 
     /**
-     * @return the list of nodes that have succeeded executing this job
+     * {@inheritDoc}
      */
+    @Override
     @NotNull
     public List<UUID> nodesSucceeded()
     {
@@ -155,8 +165,9 @@ public abstract class OperationalJob implements Task<Void>
     }
 
     /**
-     * @return the list of nodes that have failed executing this job
+     * {@inheritDoc}
      */
+    @Override
     @NotNull
     public List<UUID> nodesFailed()
     {
@@ -179,16 +190,18 @@ public abstract class OperationalJob implements Task<Void>
     }
 
     /**
-     * @return unix timestamp of the job creation time in milliseconds
+     * {@inheritDoc}
      */
+    @Override
     public long creationTime()
     {
         return UUIDs.unixTimestamp(jobId);
     }
 
     /**
-     * @return whether the operational job is executing or not.
+     * {@inheritDoc}
      */
+    @Override
     public boolean isExecuting()
     {
         return isExecuting;
@@ -216,6 +229,16 @@ public abstract class OperationalJob implements Task<Void>
      */
     public abstract boolean hasConflict(@NotNull List<OperationalJob> 
sameOperationJobs);
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public String name()
+    {
+        String simpleName = this.getClass().getSimpleName();
+        return simpleName.isEmpty() ? this.getClass().getName() : simpleName;
+    }
+
     /**
      * Determines the status of the job. OperationalJob subclasses could 
choose to override the method.
      * <p>
@@ -229,6 +252,7 @@ public abstract class OperationalJob implements Task<Void>
      *
      * @return status of the OperationalJob execution
      */
+    @Override
     public OperationalJobStatus status()
     {
         Future<Void> fut = asyncResult();
@@ -256,6 +280,22 @@ public abstract class OperationalJob implements Task<Void>
         return executionPromise.future();
     }
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    @Nullable
+    public String failureReason()
+    {
+        Future<Void> fut = asyncResult();
+        if (fut.isComplete() && fut.failed() && fut.cause() != null)
+        {
+            String message = fut.cause().getMessage();
+            return message != null ? message : String.format("Encountered %s 
during job execution", fut.cause().getClass().getName());
+        }
+        return null;
+    }
+
     /**
      * Get the async result with waiting for at most the specified wait time
      * <p>
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobInfo.java 
b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobInfo.java
new file mode 100644
index 00000000..1748e3db
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobInfo.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.job;
+
+import java.time.Instant;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * A read-only view of operational job data. Provides state accessors needed 
by handlers
+ * and utilities to inspect job status and report results, without exposing 
execution logic.
+ */
+public interface OperationalJobInfo
+{
+    /**
+     * @return the unique identifier for the job
+     */
+    UUID jobId();
+
+    /**
+     * @return the node UUID associated with the job, or {@code null} for jobs 
spanning multiple nodes
+     */
+    @Nullable
+    UUID nodeId();
+
+    /**
+     * @return the name of the operation the job performs
+     */
+    String name();
+
+    /**
+     * @return the current status of the job
+     */
+    OperationalJobStatus status();
+
+    /**
+     * @return unix timestamp of the job creation time in milliseconds
+     */
+    long creationTime();
+
+    /**
+     * @return the time this job started execution, or {@code null} if not yet 
started
+     */
+    @Nullable
+    Instant startTime();
+
+    /**
+     * @return list of node UUIDs pending execution of the job
+     */
+    @NotNull
+    List<UUID> nodesPending();
+
+    /**
+     * @return list of node UUIDs currently executing the job
+     */
+    @NotNull
+    List<UUID> nodesExecuting();
+
+    /**
+     * @return list of node UUIDs that have succeeded executing the job
+     */
+    @NotNull
+    List<UUID> nodesSucceeded();
+
+    /**
+     * @return list of node UUIDs that have failed executing the job
+     */
+    @NotNull
+    List<UUID> nodesFailed();
+
+    /**
+     * @return the time of the last status update, or {@code null} if not yet 
started
+     */
+    @Nullable
+    Instant lastUpdate();
+
+    /**
+     * @return whether the job is currently executing
+     */
+    boolean isExecuting();
+
+    /**
+     * @return the failure reason if the job has failed, or {@code null} if 
the job is still in progress or succeeded
+     */
+    @Nullable
+    String failureReason();
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobManager.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobManager.java
index 0e70e1e6..ccaed374 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobManager.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobManager.java
@@ -61,11 +61,11 @@ public class OperationalJobManager
      *
      * @return instances of the jobs that are in pending or running states
      */
-    public List<OperationalJob> allInflightJobs()
+    public List<OperationalJobInfo> allInflightJobs()
     {
         return jobTracker.jobsView().values()
                          .stream()
-                         .filter(j -> !j.asyncResult().isComplete())
+                         .filter(j -> !j.status().isCompleted())
                          .collect(Collectors.toList());
     }
 
@@ -73,9 +73,9 @@ public class OperationalJobManager
      * Fetch the job using its UUID
      *
      * @param jobId identifier of the job
-     * @return instance of the job or null
+     * @return instance of the job info or null
      */
-    public OperationalJob getJobIfExists(UUID jobId)
+    public OperationalJobInfo getJobIfExists(UUID jobId)
     {
         return jobTracker.get(jobId);
     }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobTracker.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobTracker.java
index 4d536954..5dcb4283 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobTracker.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobTracker.java
@@ -47,9 +47,9 @@ public interface OperationalJobTracker
      * Retrieve a job by its ID.
      *
      * @param jobId the job identifier
-     * @return the job, or null if not found
+     * @return the job info, or null if not found
      */
-    OperationalJob get(UUID jobId);
+    OperationalJobInfo get(UUID jobId);
 
     /**
      * Returns an immutable view of all tracked jobs.
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/utils/OperationalJobUtils.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/utils/OperationalJobUtils.java
index 52d43262..54c45cd9 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/utils/OperationalJobUtils.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/utils/OperationalJobUtils.java
@@ -26,7 +26,7 @@ import io.vertx.ext.web.RoutingContext;
 import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
 import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
 import org.apache.cassandra.sidecar.exceptions.OperationalJobConflictException;
-import org.apache.cassandra.sidecar.job.OperationalJob;
+import org.apache.cassandra.sidecar.job.OperationalJobInfo;
 
 import static 
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.FAILED;
 
@@ -47,7 +47,7 @@ public class OperationalJobUtils
      * @param job       the operational job to report status on
      * @param exception the conflict exception, if any (null if no conflict)
      */
-    public static void sendStatusBasedResponse(RoutingContext context, 
OperationalJob job, OperationalJobConflictException exception)
+    public static void sendStatusBasedResponse(RoutingContext context, 
OperationalJobInfo job, OperationalJobConflictException exception)
     {
         if (exception != null)
         {
@@ -81,9 +81,9 @@ public class OperationalJobUtils
         }
 
         String reason = null;
-        if (status == FAILED && job.asyncResult() != null && 
job.asyncResult().cause() != null)
+        if (status == FAILED)
         {
-            reason = job.asyncResult().cause().getMessage();
+            reason = job.failureReason();
         }
         context.json(OperationalJobResponse.builder()
                                            .jobId(job.jobId())
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/handlers/ListOperationalJobsHandlerTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/ListOperationalJobsHandlerTest.java
index 5a36bfde..91159bd0 100644
--- 
a/server/src/test/java/org/apache/cassandra/sidecar/handlers/ListOperationalJobsHandlerTest.java
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/ListOperationalJobsHandlerTest.java
@@ -50,6 +50,7 @@ import org.apache.cassandra.sidecar.TestModule;
 import 
org.apache.cassandra.sidecar.common.response.ListOperationalJobsResponse;
 import 
org.apache.cassandra.sidecar.common.server.exceptions.OperationalJobException;
 import org.apache.cassandra.sidecar.job.OperationalJob;
+import org.apache.cassandra.sidecar.job.OperationalJobInfo;
 import org.apache.cassandra.sidecar.job.OperationalJobManager;
 import org.apache.cassandra.sidecar.modules.SidecarModules;
 import org.apache.cassandra.sidecar.server.Server;
@@ -128,7 +129,7 @@ class ListOperationalJobsHandlerTest
         @Singleton
         public OperationalJobManager jobManager()
         {
-            List<OperationalJob> testJobs = Arrays.asList(running, running2);
+            List<OperationalJobInfo> testJobs = Arrays.asList(running, 
running2);
             OperationalJobManager mockManager = 
mock(OperationalJobManager.class);
             when(mockManager.allInflightJobs()).thenReturn(testJobs);
             return mockManager;
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/handlers/OperationalJobHandlerTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/OperationalJobHandlerTest.java
index 23efcfc5..8b4f2ef1 100644
--- 
a/server/src/test/java/org/apache/cassandra/sidecar/handlers/OperationalJobHandlerTest.java
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/OperationalJobHandlerTest.java
@@ -37,7 +37,6 @@ import com.google.inject.Module;
 import com.google.inject.Provides;
 import com.google.inject.Singleton;
 import com.google.inject.util.Modules;
-import io.vertx.core.Future;
 import io.vertx.core.Promise;
 import io.vertx.core.Vertx;
 import io.vertx.ext.web.client.WebClient;
@@ -183,7 +182,7 @@ class OperationalJobHandlerTest
             OperationalJob failedMock = mock(OperationalJob.class);
             when(failedMock.jobId()).thenReturn(failedUuid);
             when(failedMock.status()).thenReturn(OperationalJobStatus.FAILED);
-            
when(failedMock.asyncResult()).thenReturn(Future.failedFuture("Test failed"));
+            when(failedMock.failureReason()).thenReturn("Test failed");
             when(failedMock.name()).thenReturn("testFailed");
 
             
when(mockManager.getJobIfExists(runningUuid)).thenReturn(runningMock);
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobInfoTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobInfoTest.java
new file mode 100644
index 00000000..19b3a9eb
--- /dev/null
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobInfoTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.job;
+
+import java.util.List;
+import java.util.UUID;
+
+import org.junit.jupiter.api.Test;
+
+import com.datastax.driver.core.utils.UUIDs;
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import 
org.apache.cassandra.sidecar.common.server.exceptions.OperationalJobException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests for the {@link OperationalJobInfo} interface contract on {@link 
OperationalJob}
+ */
+class OperationalJobInfoTest
+{
+    @Test
+    void testFailureReasonReturnsMessageForFailedJob()
+    {
+        String expectedMessage = "something went wrong";
+        OperationalJob job = createFailingJob(expectedMessage);
+        Promise<Void> promise = Promise.promise();
+        job.execute(promise);
+
+        assertThat(job.failureReason()).isEqualTo(expectedMessage);
+    }
+
+    @Test
+    void testFailureReasonReturnsNullForSucceededJob()
+    {
+        OperationalJob job = createSucceedingJob();
+        Promise<Void> promise = Promise.promise();
+        job.execute(promise);
+
+        assertThat(job.asyncResult().succeeded()).isTrue();
+        assertThat(job.failureReason()).isNull();
+    }
+
+    @Test
+    void testFailureReasonReturnsNullForCreatedJob()
+    {
+        OperationalJob job = createSucceedingJob();
+
+        assertThat(job.asyncResult().isComplete()).isFalse();
+        assertThat(job.failureReason()).isNull();
+    }
+
+    @Test
+    void testNameReturnsFullNameForAnonymousClass()
+    {
+        OperationalJob job = createSucceedingJob();
+
+        assertThat(job.name()).isNotEmpty();
+        assertThat(job.name()).contains("OperationalJobInfoTest");
+    }
+
+    @Test
+    void testNameReturnsSimpleNameForNamedClass()
+    {
+        OperationalJob job = new NamedJob(UUIDs.timeBased());
+
+        assertThat(job.name()).isEqualTo("NamedJob");
+    }
+
+    static class NamedJob extends OperationalJob
+    {
+        NamedJob(UUID jobId)
+        {
+            super(jobId);
+        }
+
+        @Override
+        public boolean hasConflict(List<OperationalJob> sameOperationJobs)
+        {
+            return false;
+        }
+
+        @Override
+        protected Future<Void> executeInternal()
+        {
+            return Future.succeededFuture();
+        }
+    }
+
+    private static OperationalJob createFailingJob(String failureMessage)
+    {
+        return new OperationalJob(UUIDs.timeBased())
+        {
+            @Override
+            public boolean hasConflict(List<OperationalJob> sameOperationJobs)
+            {
+                return false;
+            }
+
+            @Override
+            protected Future<Void> executeInternal() throws 
OperationalJobException
+            {
+                throw new OperationalJobException(failureMessage);
+            }
+
+        };
+    }
+
+    private static OperationalJob createSucceedingJob()
+    {
+        return new OperationalJob(UUIDs.timeBased())
+        {
+            @Override
+            public boolean hasConflict(List<OperationalJob> sameOperationJobs)
+            {
+                return false;
+            }
+
+            @Override
+            protected Future<Void> executeInternal()
+            {
+                return Future.succeededFuture();
+            }
+        };
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to