This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2210e8be CASSANALYTICS-81: Unique restoreJobId per write target (#134)
2210e8be is described below

commit 2210e8be93b162bb201ea0b89131ea555bd4e7e1
Author: Yifan Cai <[email protected]>
AuthorDate: Thu Jul 31 16:36:22 2025 -0700

    CASSANALYTICS-81: Unique restoreJobId per write target (#134)
    
    Patch by Yifan Cai; Reviewed by Francisco Guerrero for CASSANALYTICS-81
---
 CHANGES.txt                                        |   1 +
 .../sidecar/common/data/RestoreJobConstants.java   |   2 +-
 .../data/CreateRestoreJobRequestPayload.java       |   2 +-
 .../bulkwriter/AbstractBulkWriterContext.java      |  12 +-
 .../bulkwriter/CassandraBulkSourceRelation.java    |  32 +++--
 .../bulkwriter/CassandraBulkWriterContext.java     |   9 ++
 .../spark/bulkwriter/CassandraJobInfo.java         |  28 +++-
 .../apache/cassandra/spark/bulkwriter/JobInfo.java |  12 ++
 .../CassandraCloudStorageTransportContext.java     |   3 +-
 .../CloudStorageDataTransferApiFactory.java        |  13 +-
 .../CloudStorageDataTransferApiImpl.java           |  34 +++--
 .../cloudstorage/CloudStorageStreamSession.java    |   2 +-
 .../CassandraCoordinatedBulkWriterContext.java     |  10 ++
 .../CoordinatedCloudStorageDataTransferApi.java    |  12 +-
 .../coordinated/CoordinatedImportCoordinator.java  |   6 +-
 .../coordinated/MultiClusterContainer.java         |   7 +
 .../spark/bulkwriter/CassandraJobInfoTest.java     | 143 +++++++++++++++++++++
 .../spark/bulkwriter/MockBulkWriterContext.java    |   7 +
 .../CloudStorageStreamSessionTest.java             |   2 +-
 .../ImportCompletionCoordinatorTest.java           |   2 +-
 .../CoordinatedImportCoordinatorTest.java          |  27 ++--
 .../coordinated/MultiClusterContainerTest.java     |  43 +++++++
 22 files changed, 350 insertions(+), 59 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 43411d1b..fdc8fed0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 0.2.0
 -----
+ * Unique restoreJobId per write target (CASSANALYTICS-81)
  * Allow writing to local datacenter only for coordinated write 
(CASSANALYTICS-75)
 
 0.1.0
diff --git 
a/analytics-sidecar-client-common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobConstants.java
 
b/analytics-sidecar-client-common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobConstants.java
index 3b665dae..a6f76215 100644
--- 
a/analytics-sidecar-client-common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobConstants.java
+++ 
b/analytics-sidecar-client-common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobConstants.java
@@ -44,7 +44,7 @@ public class RestoreJobConstants
      * datacenter, but the table replicates to multiple datacenters, it could 
cause a large amount of repair streaming
      * traffic. Arguably, you might want to leverage the intra-node repair 
feature, then you can enable this option.
      * Another use case that could justify the option is running distinct 
restore jobs, one per datacenter, concurrently.
-     * In this case, there is an external coordinator that manges the restore 
job in each datacenter.
+     * In this case, there is an external coordinator that manages the restore 
job in each datacenter.
      */
     public static final String JOB_RESTORE_TO_LOCAL_DATA_CENTER_ONLY = 
"restoreToLocalDatacenterOnly";
     public static final String SLICE_ID = "sliceId";
diff --git 
a/analytics-sidecar-client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/CreateRestoreJobRequestPayload.java
 
b/analytics-sidecar-client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/CreateRestoreJobRequestPayload.java
index 69142223..0e7fc1e3 100644
--- 
a/analytics-sidecar-client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/CreateRestoreJobRequestPayload.java
+++ 
b/analytics-sidecar-client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/CreateRestoreJobRequestPayload.java
@@ -76,7 +76,7 @@ public class CreateRestoreJobRequestPayload
      * CreateRestoreJobRequest deserializer
      *
      * @param jobId            job id of restore job
-     * @param jobAgent         arbitrary text a job can put, which can be used 
to identity itself during Http request
+     * @param jobAgent         arbitrary text a job can put, which can be used 
to identify itself during Http request
      * @param secrets          secrets to be used by restore job to download 
data
      * @param importOptions    the configured options for SSTable import
      * @param expireAtInMillis a timestamp in the future when the job is 
considered expired
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/AbstractBulkWriterContext.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/AbstractBulkWriterContext.java
index f464642c..87e5a979 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/AbstractBulkWriterContext.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/AbstractBulkWriterContext.java
@@ -32,6 +32,7 @@ import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 import org.apache.cassandra.bridge.CassandraBridge;
 import org.apache.cassandra.bridge.CassandraBridgeFactory;
+import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.MultiClusterContainer;
 import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
 import org.apache.cassandra.spark.common.stats.JobStatsPublisher;
 import org.apache.cassandra.spark.common.stats.LogStatsPublisher;
@@ -104,14 +105,21 @@ public abstract class AbstractBulkWriterContext 
implements BulkWriterContext, Kr
         validateKeyspaceReplication();
         BulkSparkConf conf = bulkSparkConf();
         TokenRangeMapping<RingInstance> tokenRangeMapping = 
cluster().getTokenRangeMapping(true);
-        UUID restoreJobId = bridge().getTimeUUID(); // used for creating 
restore job on sidecar
         TokenPartitioner tokenPartitioner = new 
TokenPartitioner(tokenRangeMapping,
                                                                  
conf.numberSplits,
                                                                  
sparkDefaultParallelism(),
                                                                  
conf.getCores());
-        return new CassandraJobInfo(conf, restoreJobId, tokenPartitioner);
+        return new CassandraJobInfo(conf, generateRestoreJobIds(), 
tokenPartitioner);
     }
 
+    /**
+     * Generate the restore job IDs used in the receiving Cassandra Sidecar 
clusters.
+     * In the coordinated write mode, there should be a unique uuid per 
cluster;
+     * In the single cluster write mode, the MultiClusterContainer would 
contain one single entry.
+     * @return restore job ids that are unique per cluster
+     */
+    protected abstract MultiClusterContainer<UUID> generateRestoreJobIds();
+
     protected CassandraBridge buildCassandraBridge()
     {
         return CassandraBridgeFactory.get(lowestCassandraVersion());
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java
index c5e255b3..40d1b330 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java
@@ -26,7 +26,6 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
-import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
@@ -41,11 +40,11 @@ import 
o.a.c.sidecar.client.shaded.common.request.data.CreateRestoreJobRequestPa
 import 
o.a.c.sidecar.client.shaded.common.request.data.UpdateRestoreJobRequestPayload;
 import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.CloudStorageDataTransferApi;
 import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.CloudStorageStreamResult;
-import org.apache.cassandra.spark.bulkwriter.cloudstorage.ImportCoordinator;
 import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.ImportCompletionCoordinator;
+import org.apache.cassandra.spark.bulkwriter.cloudstorage.ImportCoordinator;
 import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CoordinatedCloudStorageDataTransferApi;
-import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CoordinatedWriteConf;
 import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CoordinatedImportCoordinator;
+import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CoordinatedWriteConf;
 import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
 import 
org.apache.cassandra.spark.bulkwriter.token.MultiClusterReplicaAwareFailureHandler;
 import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler;
@@ -279,7 +278,11 @@ public class CassandraBulkSourceRelation extends 
BaseRelation implements Inserta
         context.transportExtensionImplementation()
                .onAllObjectsPersisted(objectCount, rowCount, 
elapsedTimeMillis());
 
-        setSliceCountForRestoreJob(context, objectCount);
+        // only need to set the slice count (for better progress tracking) 
when in coordinated write mode
+        if (writerContext.job().isCoordinatedWriteEnabled())
+        {
+            setSliceCountForRestoreJob(context, objectCount);
+        }
 
         awaitImportCompletion(context, resultsAsCloudStorageStreamResults);
         markRestoreJobAsSucceeded(context);
@@ -460,7 +463,7 @@ public class CassandraBulkSourceRelation extends 
BaseRelation implements Inserta
                                             .toRestoreJobSecrets();
             CoordinatedWriteConf.ClusterConf cluster = 
coordinatedWriteConf.cluster(clusterId);
             String localDc = cluster.resolveLocalDc(cl); // resolve the 
cluster specific localDc name
-            CreateRestoreJobRequestPayload payload = 
createJobPayloadBuilder(job, secrets)
+            CreateRestoreJobRequestPayload payload = 
createJobPayloadBuilder(job, secrets, clusterId)
                                                      
.consistencyLevel(toSidecarConsistencyLevel(cl), localDc)
                                                      // TODO: add test case 
once we advance the sidecar version that understands the flag,
                                                      //       or a better 
testing strategy is figured out
@@ -471,10 +474,15 @@ public class CassandraBulkSourceRelation extends 
BaseRelation implements Inserta
     }
 
     private CreateRestoreJobRequestPayload.Builder 
createJobPayloadBuilder(JobInfo job, RestoreJobSecrets secrets)
+    {
+        return createJobPayloadBuilder(job, secrets, null);
+    }
+
+    private CreateRestoreJobRequestPayload.Builder 
createJobPayloadBuilder(JobInfo job, RestoreJobSecrets secrets, String 
clusterId)
     {
         CreateRestoreJobRequestPayload.Builder builder = 
CreateRestoreJobRequestPayload.builder(secrets, updatedLeaseTime());
         builder.jobAgent(BuildInfo.APPLICATION_NAME)
-               .jobId(job.getRestoreJobId())
+               .jobId(job.getRestoreJobId(clusterId))
                .updateImportOptions(importOptions -> {
                    importOptions.verifySSTables(true) // we disallow the 
end-user to bypass the non-extended verify anymore
                                 .extendedVerify(false); // always turn off
@@ -490,15 +498,14 @@ public class CassandraBulkSourceRelation extends 
BaseRelation implements Inserta
     private void 
setSliceCountForRestoreJob(TransportContext.CloudStorageTransportContext 
context, long sliceCount)
     {
         UpdateRestoreJobRequestPayload requestPayload = 
UpdateRestoreJobRequestPayload.builder().withSliceCount(sliceCount).build();
-        UUID jobId = writerContext.job().getRestoreJobId();
         try
         {
-            LOGGER.info("Setting slice count for the restore job. jobId={} 
sliceCount={}", jobId, sliceCount);
+            LOGGER.info("Setting slice count for the restore job. 
sliceCount={}", sliceCount);
             context.dataTransferApi().updateRestoreJob(requestPayload);
         }
         catch (Exception e)
         {
-            LOGGER.warn("Failed to set slice count for the restore job. 
jobId={}", jobId, e);
+            LOGGER.warn("Failed to set slice count for the restore job.", e);
             // Do not rethrow - avoid triggering the catch block at the 
call-site that marks job as failed.
         }
     }
@@ -506,17 +513,16 @@ public class CassandraBulkSourceRelation extends 
BaseRelation implements Inserta
     private void 
markRestoreJobAsSucceeded(TransportContext.CloudStorageTransportContext context)
     {
         UpdateRestoreJobRequestPayload requestPayload = 
UpdateRestoreJobRequestPayload.builder().withStatus(RestoreJobStatus.SUCCEEDED).build();
-        UUID jobId = writerContext.job().getRestoreJobId();
         try
         {
-            LOGGER.info("Marking the restore job as succeeded. jobId={}", 
jobId);
+            LOGGER.info("Marking the restore job as succeeded.");
             // Prioritize the call to extension, so onJobSucceeded is always 
invoked.
             
context.transportExtensionImplementation().onJobSucceeded(elapsedTimeMillis());
             context.dataTransferApi().updateRestoreJob(requestPayload);
         }
         catch (Exception e)
         {
-            LOGGER.warn("Failed to mark the restore job as succeeded. 
jobId={}", jobId, e);
+            LOGGER.warn("Failed to mark the restore job as succeeded.", e);
             // Do not rethrow - avoid triggering the catch block at the 
call-site that marks job as failed.
         }
     }
@@ -525,8 +531,6 @@ public class CassandraBulkSourceRelation extends 
BaseRelation implements Inserta
     {
         // Prioritize the call to extension, so onJobFailed is always invoked.
         
context.transportExtensionImplementation().onJobFailed(elapsedTimeMillis(), 
cause);
-        UUID jobId = writerContext.job().getRestoreJobId();
-        LOGGER.info("Aborting job. jobId={}", jobId);
         context.dataTransferApi().abortRestoreJob();
     }
 
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
index debf0eb4..2eea6276 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
@@ -19,9 +19,12 @@
 
 package org.apache.cassandra.spark.bulkwriter;
 
+import java.util.UUID;
+
 import com.google.common.base.Preconditions;
 import org.apache.commons.lang3.StringUtils;
 
+import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.MultiClusterContainer;
 import org.apache.spark.sql.types.StructType;
 import org.jetbrains.annotations.NotNull;
 
@@ -57,4 +60,10 @@ public class CassandraBulkWriterContext extends 
AbstractBulkWriterContext
                                         && 
cluster().replicationFactor().getOptions().containsKey(conf.localDC);
         Preconditions.checkState(isReplicatedToLocalDc, "Keyspace %s is not 
replicated on datacenter %s", conf.keyspace, conf.localDC);
     }
+
+    @Override
+    protected MultiClusterContainer<UUID> generateRestoreJobIds()
+    {
+        return MultiClusterContainer.ofSingle(bridge().getTimeUUID());
+    }
 }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java
index 3759ea1c..27b8eb45 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java
@@ -19,11 +19,14 @@
 
 package org.apache.cassandra.spark.bulkwriter;
 
+import java.util.NoSuchElementException;
 import java.util.UUID;
 
 import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CoordinatedWriteConf;
+import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.MultiClusterContainer;
 import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
 import org.apache.cassandra.spark.data.QualifiedTableName;
+import org.apache.cassandra.spark.utils.Preconditions;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
@@ -31,12 +34,14 @@ public class CassandraJobInfo implements JobInfo
 {
     private static final long serialVersionUID = 6140098484732683759L;
     protected final BulkSparkConf conf;
-    protected final UUID restoreJobId;
+    // restoreJobId per cluster; it is guaranteed to be non-empty
+    protected final MultiClusterContainer<UUID> restoreJobIds;
     protected final TokenPartitioner tokenPartitioner;
 
-    public CassandraJobInfo(BulkSparkConf conf, UUID restoreJobId, 
TokenPartitioner tokenPartitioner)
+    public CassandraJobInfo(BulkSparkConf conf, MultiClusterContainer<UUID> 
restoreJobIds, TokenPartitioner tokenPartitioner)
     {
-        this.restoreJobId = restoreJobId;
+        Preconditions.checkArgument(restoreJobIds.size() > 0, "restoreJobIds 
cannot be empty");
+        this.restoreJobIds = restoreJobIds;
         this.conf = conf;
         this.tokenPartitioner = tokenPartitioner;
     }
@@ -128,7 +133,22 @@ public class CassandraJobInfo implements JobInfo
     @Override
     public UUID getRestoreJobId()
     {
-        return restoreJobId;
+        // get the id assume it is single cluster scenario; using `null` to 
retrieve.
+        try
+        {
+            return getRestoreJobId(null);
+        }
+        catch (NoSuchElementException nsee)
+        {
+            // if it is multi-cluster scenario, id would be null; in this 
case, just retrieve a value
+            return restoreJobIds.getAnyValue();
+        }
+    }
+
+    @Override
+    public UUID getRestoreJobId(@Nullable String clusterId)
+    {
+        return restoreJobIds.getValueOrThrow(clusterId);
     }
 
     @Override
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
index bbbb03b7..2c4a089e 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
@@ -20,6 +20,7 @@
 package org.apache.cassandra.spark.bulkwriter;
 
 import java.io.Serializable;
+import java.util.NoSuchElementException;
 import java.util.UUID;
 
 import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CoordinatedWriteConf;
@@ -52,10 +53,21 @@ public interface JobInfo extends Serializable
      */
     UUID getRestoreJobId();
 
+    /**
+     * Returns the restore job identifier on Cassandra Sidecar of the cluster 
identified by the clusterId
+     * The method should be called in the coordinated write code path.
+     *
+     * @param clusterId identifies the Cassandra cluster
+     * @return restore job identifier, a time-based uuid
+     * @throws NoSuchElementException when there is no restoreJobId associated 
with the clusterId
+     */
+    UUID getRestoreJobId(@Nullable String clusterId) throws 
NoSuchElementException;
+
     /**
      * An optional unique identified supplied in spark configuration
      * @return a id string or null
      */
+    @Nullable
     String getConfiguredJobId();
 
     // Convenient method to decide a unique identified used for the job.
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CassandraCloudStorageTransportContext.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CassandraCloudStorageTransportContext.java
index 5f881b9d..9184dd5a 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CassandraCloudStorageTransportContext.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CassandraCloudStorageTransportContext.java
@@ -136,8 +136,7 @@ public class CassandraCloudStorageTransportContext 
implements TransportContext.C
         {
             Class<StorageTransportExtension> clazz = 
(Class<StorageTransportExtension>) Class.forName(transportExtensionClass);
             StorageTransportExtension extension = 
clazz.getConstructor().newInstance();
-            LOGGER.info("Initializing storage transport extension. jobId={}, 
restoreJobId={}",
-                        jobInfo.getId(), jobInfo.getRestoreJobId());
+            LOGGER.info("Initializing storage transport extension. jobId={}", 
jobInfo.getId());
             extension.initialize(jobInfo.getId(), conf.getSparkConf(), 
isOnDriver);
             // Only assign when initialization is complete
             // to avoid exposing uninitialized extension object, which leads 
to unexpected behavior
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageDataTransferApiFactory.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageDataTransferApiFactory.java
index a20efb9f..192c045e 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageDataTransferApiFactory.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageDataTransferApiFactory.java
@@ -26,6 +26,7 @@ import org.apache.cassandra.spark.bulkwriter.ClusterInfo;
 import org.apache.cassandra.spark.bulkwriter.JobInfo;
 import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CassandraClusterInfoGroup;
 import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CoordinatedCloudStorageDataTransferApi;
+import org.jetbrains.annotations.Nullable;
 
 public class CloudStorageDataTransferApiFactory
 {
@@ -46,15 +47,19 @@ public class CloudStorageDataTransferApiFactory
         }
         else
         {
-            return createForSingleCluster(storageClient, jobInfo, clusterInfo);
+            return createForSingleCluster(storageClient, jobInfo, clusterInfo, 
null);
         }
     }
 
-    private CloudStorageDataTransferApiImpl 
createForSingleCluster(StorageClient storageClient, JobInfo jobInfo, 
ClusterInfo clusterInfo)
+    private CloudStorageDataTransferApiImpl 
createForSingleCluster(StorageClient storageClient,
+                                                                   JobInfo 
jobInfo,
+                                                                   ClusterInfo 
clusterInfo,
+                                                                   @Nullable 
String clusterId)
     {
         return new CloudStorageDataTransferApiImpl(jobInfo,
                                                    
clusterInfo.getCassandraContext().getSidecarClient(),
-                                                   storageClient);
+                                                   storageClient,
+                                                   clusterId);
     }
 
     private CoordinatedCloudStorageDataTransferApi 
createForCoordinated(StorageClient storageClient,
@@ -63,7 +68,7 @@ public class CloudStorageDataTransferApiFactory
     {
         Map<String, CloudStorageDataTransferApiImpl> apiByClusterId = new 
HashMap<>(clusterInfoGroup.size());
         clusterInfoGroup.forEach((clusterId, clusterInfo) -> {
-            apiByClusterId.put(clusterId, 
createForSingleCluster(storageClient, jobInfo, clusterInfo));
+            apiByClusterId.put(clusterId, 
createForSingleCluster(storageClient, jobInfo, clusterInfo, clusterId));
         });
 
         return new CoordinatedCloudStorageDataTransferApi(apiByClusterId);
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageDataTransferApiImpl.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageDataTransferApiImpl.java
index 3c44ccdb..b09cca5a 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageDataTransferApiImpl.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageDataTransferApiImpl.java
@@ -19,6 +19,7 @@
 
 package org.apache.cassandra.spark.bulkwriter.cloudstorage;
 
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 
 import org.slf4j.Logger;
@@ -42,6 +43,7 @@ import org.apache.cassandra.spark.data.QualifiedTableName;
 import org.apache.cassandra.spark.exception.S3ApiCallException;
 import org.apache.cassandra.spark.exception.SidecarApiCallException;
 import org.apache.cassandra.spark.transports.storage.StorageCredentials;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Encapsulates transfer APIs used by {@link CloudStorageStreamSession}. 
{@link StorageClient} is used to interact with S3 and
@@ -49,15 +51,20 @@ import 
org.apache.cassandra.spark.transports.storage.StorageCredentials;
  */
 public class CloudStorageDataTransferApiImpl implements 
CloudStorageDataTransferApi
 {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CloudStorageDataTransferApiImpl.class);
+
+    @Nullable
+    private final String clusterId;
     private final JobInfo jobInfo;
     private final SidecarClient sidecarClient;
     private final StorageClient storageClient;
 
-    public CloudStorageDataTransferApiImpl(JobInfo jobInfo, SidecarClient 
sidecarClient, StorageClient storageClient)
+    public CloudStorageDataTransferApiImpl(JobInfo jobInfo, SidecarClient 
sidecarClient, StorageClient storageClient, @Nullable String clusterId)
     {
         this.jobInfo = jobInfo;
         this.sidecarClient = sidecarClient;
         this.storageClient = storageClient;
+        this.clusterId = clusterId;
     }
 
     public JobInfo jobInfo()
@@ -91,6 +98,7 @@ public class CloudStorageDataTransferApiImpl implements 
CloudStorageDataTransfer
     @Override
     public void createRestoreJob(CreateRestoreJobRequestPayload 
createRestoreJobRequestPayload) throws SidecarApiCallException
     {
+        UUID jobId = jobInfo.getRestoreJobId(clusterId);
         try
         {
             QualifiedTableName qualifiedTableName = 
jobInfo.qualifiedTableName();
@@ -101,24 +109,25 @@ public class CloudStorageDataTransferApiImpl implements 
CloudStorageDataTransfer
         catch (Exception exception)
         {
             handleInterruption(exception);
-            throw new SidecarApiCallException("Failed to create a new restore 
job. restoreJobId=" + jobInfo.getRestoreJobId(), exception);
+            throw new SidecarApiCallException("Failed to create a new restore 
job. restoreJobId=" + jobId, exception);
         }
     }
 
     @Override
     public RestoreJobSummaryResponsePayload restoreJobSummary() throws 
SidecarApiCallException
     {
+        UUID jobId = jobInfo.getRestoreJobId(clusterId);
         try
         {
             QualifiedTableName qualifiedTableName = 
jobInfo.qualifiedTableName();
             return 
sidecarClient.restoreJobSummary(qualifiedTableName.keyspace(),
                                                    qualifiedTableName.table(),
-                                                   
jobInfo.getRestoreJobId()).get();
+                                                   jobId).get();
         }
         catch (Exception exception)
         {
             handleInterruption(exception);
-            throw new SidecarApiCallException("Failed to retrieve restore job 
summary. restoreJobId=" + jobInfo.getRestoreJobId(), exception);
+            throw new SidecarApiCallException("Failed to retrieve restore job 
summary. restoreJobId=" + jobId, exception);
         }
     }
 
@@ -126,6 +135,7 @@ public class CloudStorageDataTransferApiImpl implements 
CloudStorageDataTransfer
     public void createRestoreSliceFromExecutor(SidecarInstance sidecarInstance,
                                                CreateSliceRequestPayload 
createSliceRequestPayload) throws SidecarApiCallException
     {
+        UUID jobId = jobInfo.getRestoreJobId(clusterId);
         try
         {
             createRestoreSliceWithCustomRetry(sidecarInstance, 
createSliceRequestPayload, new ExecutorCreateSliceRetryPolicy(sidecarClient))
@@ -135,7 +145,7 @@ public class CloudStorageDataTransferApiImpl implements 
CloudStorageDataTransfer
         {
             handleInterruption(exception);
             throw new SidecarApiCallException("Failed to create restore slice. 
" +
-                                              "restoreJobId=" + 
jobInfo.getRestoreJobId() +
+                                              "restoreJobId=" + jobId +
                                               " payload=" + 
createSliceRequestPayload,
                                               exception);
         }
@@ -152,35 +162,39 @@ public class CloudStorageDataTransferApiImpl implements 
CloudStorageDataTransfer
     @Override
     public void updateRestoreJob(UpdateRestoreJobRequestPayload 
updateRestoreJobRequestPayload) throws SidecarApiCallException
     {
+        UUID jobId = jobInfo.getRestoreJobId(clusterId);
         try
         {
+            LOGGER.info("Updating the restore job. clusterId={} 
restoreJobId={}", clusterId, jobId);
             QualifiedTableName qualifiedTableName = 
jobInfo.qualifiedTableName();
             sidecarClient.updateRestoreJob(qualifiedTableName.keyspace(),
                                            qualifiedTableName.table(),
-                                           jobInfo.getRestoreJobId(),
+                                           jobId,
                                            
updateRestoreJobRequestPayload).get();
         }
         catch (Exception exception)
         {
             handleInterruption(exception);
-            throw new SidecarApiCallException("Failed to update restore job. 
restoreJobId=" + jobInfo.getRestoreJobId(), exception);
+            throw new SidecarApiCallException("Failed to update restore job. 
restoreJobId=" + jobId, exception);
         }
     }
 
     @Override
     public void abortRestoreJob() throws SidecarApiCallException
     {
+        UUID jobId = jobInfo.getRestoreJobId(clusterId);
         try
         {
+            LOGGER.info("Abort job. clusterId={} restoreJobId={}", clusterId, 
jobId);
             QualifiedTableName qualifiedTableName = 
jobInfo.qualifiedTableName();
             sidecarClient.abortRestoreJob(qualifiedTableName.keyspace(),
                                           qualifiedTableName.table(),
-                                          jobInfo.getRestoreJobId()).get();
+                                          jobId).get();
         }
         catch (Exception exception)
         {
             handleInterruption(exception);
-            throw new SidecarApiCallException("Failed to abort restore job. 
restoreJobId=" + jobInfo.getRestoreJobId(), exception);
+            throw new SidecarApiCallException("Failed to abort restore job. 
restoreJobId=" + jobId, exception);
         }
     }
 
@@ -194,7 +208,7 @@ public class CloudStorageDataTransferApiImpl implements 
CloudStorageDataTransfer
         QualifiedTableName qualifiedTableName = jobInfo.qualifiedTableName();
         CreateRestoreJobSliceRequest request = new 
CreateRestoreJobSliceRequest(qualifiedTableName.keyspace(),
                                                                                
 qualifiedTableName.table(),
-                                                                               
 jobInfo.getRestoreJobId(),
+                                                                               
 jobInfo.getRestoreJobId(clusterId),
                                                                                
 createSliceRequestPayload);
         return sidecarClient.executeRequestAsync(sidecarClient.requestBuilder()
                                                               
.retryPolicy(retryPolicy)
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageStreamSession.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageStreamSession.java
index 964c4edf..0dcebbaf 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageStreamSession.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageStreamSession.java
@@ -95,7 +95,7 @@ public class CloudStorageStreamSession extends 
StreamSession<TransportContext.Cl
 
         JobInfo job = bulkWriterContext.job();
         long maxSizePerBundleInBytes = 
job.transportInfo().getMaxSizePerBundleInBytes();
-        this.bundleNameGenerator = new 
BundleNameGenerator(job.getRestoreJobId().toString(), sessionID);
+        this.bundleNameGenerator = new BundleNameGenerator(job.getId(), 
sessionID);
         this.dataTransferApi = transportContext.dataTransferApi();
         this.bridge = bridge;
         QualifiedTableName qualifiedTableName = job.qualifiedTableName();
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraCoordinatedBulkWriterContext.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraCoordinatedBulkWriterContext.java
index a6722e30..70fa59c5 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraCoordinatedBulkWriterContext.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraCoordinatedBulkWriterContext.java
@@ -19,6 +19,8 @@
 
 package org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated;
 
+import java.util.UUID;
+
 import com.google.common.base.Preconditions;
 import org.apache.commons.lang3.StringUtils;
 
@@ -82,6 +84,14 @@ public class CassandraCoordinatedBulkWriterContext extends 
AbstractBulkWriterCon
         });
     }
 
+    @Override
+    protected MultiClusterContainer<UUID> generateRestoreJobIds()
+    {
+        MultiClusterContainer<UUID> result = new MultiClusterContainer<>();
+        clusterInfoGroup().forEach((clusterId, ignored) -> 
result.setValue(clusterId, bridge().getTimeUUID()));
+        return result;
+    }
+
     protected CassandraClusterInfoGroup clusterInfoGroup()
     {
         return (CassandraClusterInfoGroup) cluster();
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CoordinatedCloudStorageDataTransferApi.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CoordinatedCloudStorageDataTransferApi.java
index 81a93895..a56d896f 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CoordinatedCloudStorageDataTransferApi.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CoordinatedCloudStorageDataTransferApi.java
@@ -121,7 +121,7 @@ public class CoordinatedCloudStorageDataTransferApi 
implements CloudStorageDataT
     @Override
     public void createRestoreSliceFromExecutor(String clusterId, 
CreateSliceRequestPayload createSliceRequestPayload) throws 
SidecarApiCallException
     {
-        
createRestoreSliceInternal(dataTransferApis.getValueOrThrow(clusterId), 
createSliceRequestPayload);
+        createRestoreSliceInternal(clusterId, 
dataTransferApis.getValueOrThrow(clusterId), createSliceRequestPayload);
     }
 
     @Override
@@ -161,15 +161,17 @@ public class CoordinatedCloudStorageDataTransferApi 
implements CloudStorageDataT
         throw new UnsupportedOperationException("Not supported for coordinated 
write");
     }
 
-    private void createRestoreSliceInternal(CloudStorageDataTransferApiImpl 
dataTransferApi,
+    private void createRestoreSliceInternal(String clusterId,
+                                            CloudStorageDataTransferApiImpl 
dataTransferApi,
                                             CreateSliceRequestPayload 
createSliceRequestPayload) throws SidecarApiCallException
     {
         JobInfo jobInfo = dataTransferApi.jobInfo();
         SidecarClient sidecarClient = dataTransferApi.sidecarClient();
         QualifiedTableName qualifiedTableName = jobInfo.qualifiedTableName();
+        UUID restoreJobId = jobInfo.getRestoreJobId(clusterId);
         CreateRestoreJobSliceRequest request = new 
CreateRestoreJobSliceRequest(qualifiedTableName.keyspace(),
                                                                                
 qualifiedTableName.table(),
-                                                                               
 jobInfo.getRestoreJobId(),
+                                                                               
 restoreJobId,
                                                                                
 createSliceRequestPayload);
         RetryPolicy retryPolicy = new 
CloudStorageDataTransferApiImpl.ExecutorCreateSliceRetryPolicy(sidecarClient);
         RequestContext requestContext = 
sidecarClient.requestBuilder().retryPolicy(retryPolicy).request(request).build();
@@ -181,7 +183,7 @@ public class CoordinatedCloudStorageDataTransferApi 
implements CloudStorageDataT
         {
             handleInterruption(exception);
             throw new SidecarApiCallException("Failed to create restore job 
slice. " +
-                                              "restoreJobId=" + 
jobInfo.getRestoreJobId() +
+                                              "restoreJobId=" + restoreJobId +
                                               " sliceId=" + 
createSliceRequestPayload.sliceId(),
                                               exception);
         }
@@ -194,7 +196,7 @@ public class CoordinatedCloudStorageDataTransferApi 
implements CloudStorageDataT
     {
         JobInfo jobInfo = dataTransferApi.jobInfo();
         QualifiedTableName qualifiedTableName = jobInfo.qualifiedTableName();
-        UUID restoreJobId = jobInfo.getRestoreJobId();
+        UUID restoreJobId = jobInfo.getRestoreJobId(clusterId);
         RestoreJobProgressRequestParams requestParams = new 
RestoreJobProgressRequestParams(qualifiedTableName.keyspace(),
                                                                                
             qualifiedTableName.table(),
                                                                                
             restoreJobId,
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CoordinatedImportCoordinator.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CoordinatedImportCoordinator.java
index f7286ae2..80639d30 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CoordinatedImportCoordinator.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CoordinatedImportCoordinator.java
@@ -227,7 +227,11 @@ public class CoordinatedImportCoordinator implements 
ImportCoordinator, Coordina
         completedClusters.add(clusterId);
         String error = String.format("Some of the token ranges cannot satisfy 
with consistency level. " +
                                      "job=%s phase=%s consistencyLevel=%s 
clusterId=%s ranges=%s",
-                                     job.getRestoreJobId(), 
determineReadyPhase(), job.getConsistencyLevel(), clusterId, 
progress.failedRanges());
+                                     job.getRestoreJobId(clusterId),
+                                     determineReadyPhase(),
+                                     job.getConsistencyLevel(),
+                                     clusterId,
+                                     progress.failedRanges());
         LOGGER.error(error);
         ConsistencyNotSatisfiedException exception = new 
ConsistencyNotSatisfiedException(error);
         failureHandler.accept(clusterId, exception);
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/MultiClusterContainer.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/MultiClusterContainer.java
index 4a9e58ac..a88b1cbc 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/MultiClusterContainer.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/MultiClusterContainer.java
@@ -48,6 +48,13 @@ public class MultiClusterContainer<T> implements 
Serializable, MultiClusterSuppo
     // For coordinated write, the key should be String values of clusterId
     private final Map<Object, T> byCluster = new ConcurrentHashMap<>();
 
+    public static <T> MultiClusterContainer<T> ofSingle(T value)
+    {
+        MultiClusterContainer<T> result = new MultiClusterContainer<>();
+        result.setValue(null, value);
+        return result;
+    }
+
     @Nullable
     @Override
     public T getValueOrNull(@Nullable String clusterId)
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfoTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfoTest.java
new file mode 100644
index 00000000..012fd7a4
--- /dev/null
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfoTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter;
+
+import java.util.HashSet;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.UUID;
+
+import org.junit.jupiter.api.Test;
+
+import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.MultiClusterContainer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Unit tests for CassandraJobInfo class focusing on restore job ID handling
+ * in single and multi-cluster scenarios.
+ */
+class CassandraJobInfoTest
+{
+    private static final String CLUSTER_1 = "cluster1";
+    private static final String CLUSTER_2 = "cluster2";
+    private final UUID cluster1JobId = UUID.randomUUID();
+    private final UUID cluster2JobId = UUID.randomUUID();
+
+    @Test
+    void testGetRestoreJobIdInSingleClusterScenario()
+    {
+        MultiClusterContainer<UUID> restoreJobIds = 
MultiClusterContainer.ofSingle(cluster1JobId);
+        CassandraJobInfo jobInfo = createJobInfo(restoreJobIds);
+        assertThat(jobInfo.getRestoreJobId()).isEqualTo(cluster1JobId);
+        assertThat(jobInfo.getRestoreJobId(null)).isEqualTo(cluster1JobId);
+    }
+
+    @Test
+    void testGetRestoreJobIdWithClusterIdReturnsCorrectUuidForSpecificCluster()
+    {
+        MultiClusterContainer<UUID> restoreJobIds = new 
MultiClusterContainer<>();
+        restoreJobIds.setValue(CLUSTER_1, cluster1JobId);
+        restoreJobIds.setValue(CLUSTER_2, cluster2JobId);
+        CassandraJobInfo jobInfo = createJobInfo(restoreJobIds);
+        
assertThat(jobInfo.getRestoreJobId(CLUSTER_1)).isEqualTo(cluster1JobId);
+        
assertThat(jobInfo.getRestoreJobId(CLUSTER_2)).isEqualTo(cluster2JobId);
+    }
+
+    @Test
+    void testGetRestoreJobIdThrowsNoSuchElement()
+    {
+        MultiClusterContainer<UUID> restoreJobIds = new 
MultiClusterContainer<>();
+        restoreJobIds.setValue(CLUSTER_1, cluster1JobId);
+        restoreJobIds.setValue(CLUSTER_2, cluster2JobId);
+        CassandraJobInfo jobInfo = createJobInfo(restoreJobIds);
+        assertThatThrownBy(() -> jobInfo.getRestoreJobId(null))
+        .isInstanceOf(NoSuchElementException.class);
+
+        assertThatThrownBy(() -> 
jobInfo.getRestoreJobId("non-existent-cluster"))
+        .isInstanceOf(NoSuchElementException.class);
+    }
+
+    @Test
+    void 
testGetRestoreJobIdFallbackToGetAnyValueWhenClusterIdIsNullAndNotFound()
+    {
+        MultiClusterContainer<UUID> restoreJobIds = new 
MultiClusterContainer<>();
+        restoreJobIds.setValue(CLUSTER_1, cluster1JobId);
+        restoreJobIds.setValue(CLUSTER_2, cluster2JobId);
+        CassandraJobInfo jobInfo = createJobInfo(restoreJobIds);
+        // getRestoreJobId() without parameters should fall back to 
getAnyValue()
+        UUID resultJobId = jobInfo.getRestoreJobId();
+        assertThat(resultJobId).isNotNull();
+        // Should be one of the two cluster job IDs
+        assertThat(resultJobId).isIn(cluster1JobId, cluster2JobId);
+    }
+
+    @Test
+    void testConstructorValidationFailsWithEmptyRestoreJobIds()
+    {
+        MultiClusterContainer<UUID> emptyContainer = new 
MultiClusterContainer<>();
+        BulkSparkConf mockConf = mock(BulkSparkConf.class);
+        TokenPartitioner mockPartitioner = mock(TokenPartitioner.class);
+        assertThatThrownBy(() -> new CassandraJobInfo(mockConf, 
emptyContainer, mockPartitioner))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("restoreJobIds cannot be empty");
+    }
+
+    @Test
+    void testGetRestoreJobIdConsistencyAcrossMultipleCalls()
+    {
+        UUID expectedJobId = UUID.randomUUID();
+        MultiClusterContainer<UUID> restoreJobIds = 
MultiClusterContainer.ofSingle(expectedJobId);
+        CassandraJobInfo jobInfo = createJobInfo(restoreJobIds);
+        // Multiple calls should return the same UUID
+        Set<UUID> results = new HashSet<>();
+        results.add(jobInfo.getRestoreJobId());
+        results.add(jobInfo.getRestoreJobId());
+        results.add(jobInfo.getRestoreJobId(null));
+        assertThat(results).hasSize(1);
+    }
+
+    @Test
+    void testGetRestoreJobIdWithMultiClusterConsistency()
+    {
+        MultiClusterContainer<UUID> restoreJobIds = new 
MultiClusterContainer<>();
+        restoreJobIds.setValue(CLUSTER_1, cluster1JobId);
+        restoreJobIds.setValue(CLUSTER_2, cluster2JobId);
+        CassandraJobInfo jobInfo = createJobInfo(restoreJobIds);
+        // Multiple calls for same cluster should return same UUID
+        
assertThat(jobInfo.getRestoreJobId(CLUSTER_1)).isEqualTo(cluster1JobId);
+        
assertThat(jobInfo.getRestoreJobId(CLUSTER_1)).isEqualTo(cluster1JobId);
+        
assertThat(jobInfo.getRestoreJobId(CLUSTER_2)).isEqualTo(cluster2JobId);
+        
assertThat(jobInfo.getRestoreJobId(CLUSTER_2)).isEqualTo(cluster2JobId);
+        // Fallback calls should be consistent within same instance
+        UUID fallback1 = jobInfo.getRestoreJobId();
+        UUID fallback2 = jobInfo.getRestoreJobId();
+        assertThat(fallback1).isEqualTo(fallback2);
+    }
+
+    private CassandraJobInfo createJobInfo(MultiClusterContainer<UUID> 
restoreJobIds)
+    {
+        BulkSparkConf mockConf = mock(BulkSparkConf.class);
+        TokenPartitioner mockPartitioner = mock(TokenPartitioner.class);
+        return new CassandraJobInfo(mockConf, restoreJobIds, mockPartitioner);
+    }
+}
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
index e9f86caa..d65db038 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -332,6 +333,12 @@ public class MockBulkWriterContext implements 
BulkWriterContext, ClusterInfo, Jo
         return jobId;
     }
 
+    @Override
+    public UUID getRestoreJobId(@Nullable String clusterId) throws 
NoSuchElementException
+    {
+        return jobId;
+    }
+
     @Override
     public String getConfiguredJobId()
     {
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageStreamSessionTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageStreamSessionTest.java
index 987b5e12..539f625b 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageStreamSessionTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageStreamSessionTest.java
@@ -201,7 +201,7 @@ class CloudStorageStreamSessionTest
 
         MockBlobTransferApi(JobInfo jobInfo, SidecarClient sidecarClient, 
StorageClient storageClient)
         {
-            super(jobInfo, sidecarClient, storageClient);
+            super(jobInfo, sidecarClient, storageClient, null);
         }
 
         @Override
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/ImportCompletionCoordinatorTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/ImportCompletionCoordinatorTest.java
index dbc0d271..e4fd9616 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/ImportCompletionCoordinatorTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/ImportCompletionCoordinatorTest.java
@@ -130,7 +130,7 @@ class ImportCompletionCoordinatorTest
         writerValidator = new BulkWriteValidator(mockWriterContext, new 
MultiClusterReplicaAwareFailureHandler<>(Partitioner.Murmur3Partitioner));
 
         // clients will not be used in this test class; mock is at the API 
method level
-        CloudStorageDataTransferApiImpl api = new 
CloudStorageDataTransferApiImpl(mockJobInfo, mock(SidecarClient.class), 
mock(StorageClient.class));
+        CloudStorageDataTransferApiImpl api = new 
CloudStorageDataTransferApiImpl(mockJobInfo, mock(SidecarClient.class), 
mock(StorageClient.class), null);
         dataTransferApi = spy(api);
 
         mockExtension = mock(StorageTransportExtension.class);
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CoordinatedImportCoordinatorTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CoordinatedImportCoordinatorTest.java
index 75d47205..99e3b8ba 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CoordinatedImportCoordinatorTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CoordinatedImportCoordinatorTest.java
@@ -61,6 +61,7 @@ import static org.assertj.core.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.ArgumentMatchers.same;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
@@ -72,7 +73,9 @@ import static org.mockito.Mockito.when;
 class CoordinatedImportCoordinatorTest
 {
     CoordinatedImportCoordinator coordinator;
-    UUID jobId;
+    UUID restoreJobId1 = UUID.randomUUID();
+    UUID restoreJobId2 = UUID.randomUUID();
+    String jobId = restoreJobId1.toString();
     JobInfo mockJobInfo;
     CassandraCoordinatedBulkWriterContext mockWriterContext;
     StorageTransportExtension mockExtension;
@@ -89,9 +92,9 @@ class CoordinatedImportCoordinatorTest
     public void setup() throws Exception
     {
         mockJobInfo = mock(JobInfo.class);
-        jobId = UUID.randomUUID();
-        when(mockJobInfo.getId()).thenReturn(jobId.toString());
-        when(mockJobInfo.getRestoreJobId()).thenReturn(jobId);
+        when(mockJobInfo.getId()).thenReturn(jobId);
+        
when(mockJobInfo.getRestoreJobId(eq(clusterId1))).thenReturn(restoreJobId1);
+        
when(mockJobInfo.getRestoreJobId(eq(clusterId2))).thenReturn(restoreJobId2);
         when(mockJobInfo.qualifiedTableName()).thenReturn(new 
QualifiedTableName("testkeyspace", "testtable"));
         
when(mockJobInfo.getConsistencyLevel()).thenReturn(ConsistencyLevel.CL.LOCAL_QUORUM);
         when(mockJobInfo.jobKeepAliveMinutes()).thenReturn(-1);
@@ -137,14 +140,14 @@ class CoordinatedImportCoordinatorTest
         assertThat(coordinator.isStageReady()).isFalse();
         assertThat(coordinator.isImportReady()).isFalse();
         // signal stage ready
-        coordinator.onStageReady(jobId.toString());
+        coordinator.onStageReady(jobId);
         assertThat(coordinator.isStageReady()).isTrue();
 
         loopAssert(() -> stagedClusters.getAllValues().size() == 2, "waiting 
for all cluster to stage successfully");
         
assertThat(stagedClusters.getAllValues()).containsExactlyInAnyOrder(clusterId1, 
clusterId2);
 
         // signal apply read
-        coordinator.onImportReady(jobId.toString());
+        coordinator.onImportReady(jobId);
 
         loopAssert(() -> appliedClusters.getAllValues().size() == 2, "waiting 
for all cluster to import successfully");
         
assertThat(appliedClusters.getAllValues()).containsExactlyInAnyOrder(clusterId1,
 clusterId2);
@@ -165,7 +168,7 @@ class CoordinatedImportCoordinatorTest
         CompletableFuture<Void> fut = 
CompletableFuture.runAsync(coordinator::await);
 
         // signal stage ready
-        coordinator.onStageReady(jobId.toString());
+        coordinator.onStageReady(jobId);
 
         loopAssert(() -> coordinator.failure() != null, "waiting for 
coordinator to fail");
         assertThat(coordinator.succeeded()).isFalse();
@@ -196,7 +199,7 @@ class CoordinatedImportCoordinatorTest
         CompletableFuture.runAsync(coordinator::await);
 
         // signal stage ready
-        coordinator.onStageReady(jobId.toString());
+        coordinator.onStageReady(jobId);
 
         loopAssert(() -> coordinator.failure() != null, "waiting for 
coordinator to fail");
         assertThat(coordinator.succeeded()).isFalse();
@@ -204,7 +207,7 @@ class CoordinatedImportCoordinatorTest
         .isExactlyInstanceOf(ImportFailedException.class)
         .hasRootCauseExactlyInstanceOf(ConsistencyNotSatisfiedException.class)
         .hasRootCauseMessage("Some of the token ranges cannot satisfy with 
consistency level. " +
-                             "job=" + jobId +
+                             "job=" + restoreJobId1 +
                              " phase=STAGE_READY consistencyLevel=LOCAL_QUORUM 
clusterId=cluster1 ranges=null");
     }
 
@@ -224,13 +227,13 @@ class CoordinatedImportCoordinatorTest
         CompletableFuture.runAsync(coordinator::await);
 
         // signal stage ready
-        coordinator.onStageReady(jobId.toString());
+        coordinator.onStageReady(jobId);
 
         loopAssert(() -> stagedClusters.getAllValues().size() == 2, "waiting 
for all cluster to stage successfully");
         
assertThat(stagedClusters.getAllValues()).containsExactlyInAnyOrder(clusterId1, 
clusterId2);
 
         // signal apply read
-        coordinator.onImportReady(jobId.toString());
+        coordinator.onImportReady(jobId);
 
         loopAssert(() -> coordinator.failure() != null, "waiting for 
coordinator to fail");
         assertThat(coordinator.succeeded()).isFalse();
@@ -238,7 +241,7 @@ class CoordinatedImportCoordinatorTest
         .isExactlyInstanceOf(ImportFailedException.class)
         .hasRootCauseExactlyInstanceOf(ConsistencyNotSatisfiedException.class)
         .hasRootCauseMessage("Some of the token ranges cannot satisfy with 
consistency level. " +
-                             "job=" + jobId +
+                             "job=" + restoreJobId1 +
                              " phase=IMPORT_READY 
consistencyLevel=LOCAL_QUORUM clusterId=cluster1 ranges=null");
     }
 
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/MultiClusterContainerTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/MultiClusterContainerTest.java
index b449354f..555fb289 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/MultiClusterContainerTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/MultiClusterContainerTest.java
@@ -19,12 +19,16 @@
 
 package org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.NoSuchElementException;
+import java.util.UUID;
 
 import org.junit.jupiter.api.Test;
 
+import org.apache.cassandra.spark.utils.SerializationUtils;
+
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.assertj.core.api.Assertions.fail;
@@ -189,6 +193,45 @@ class MultiClusterContainerTest
         });
     }
 
+    @Test
+    void testSerializationAndDeserializationWithSingleClusterContainer() 
throws IOException, ClassNotFoundException
+    {
+        UUID expectedJobId = UUID.randomUUID();
+        MultiClusterContainer<UUID> container = 
MultiClusterContainer.ofSingle(expectedJobId);
+
+        byte[] serialized = SerializationUtils.serialize(container);
+        MultiClusterContainer<UUID> deserializedContainer = 
SerializationUtils.deserialize(serialized, MultiClusterContainer.class);
+
+        // Verify deserialized container works correctly
+        
assertThat(deserializedContainer.getValueOrNull(null)).isEqualTo(expectedJobId);
+        assertThat(deserializedContainer.size()).isOne();
+        
assertThat(deserializedContainer.getAnyValue()).isEqualTo(expectedJobId);
+    }
+
+    @Test
+    void testSerializationAndDeserializationWithMultiClusterContainer() throws 
IOException, ClassNotFoundException
+    {
+        UUID cluster1JobId = UUID.randomUUID();
+        UUID cluster2JobId = UUID.randomUUID();
+
+        MultiClusterContainer<UUID> container = new MultiClusterContainer<>();
+        container.setValue("cluster1", cluster1JobId);
+        container.setValue("cluster2", cluster2JobId);
+
+        byte[] serialized = SerializationUtils.serialize(container);
+        MultiClusterContainer<UUID> deserializedContainer = 
SerializationUtils.deserialize(serialized, MultiClusterContainer.class);
+
+        // Verify deserialized container works correctly
+        
assertThat(deserializedContainer.getValueOrNull("cluster1")).isEqualTo(cluster1JobId);
+        
assertThat(deserializedContainer.getValueOrNull("cluster2")).isEqualTo(cluster2JobId);
+        assertThat(deserializedContainer.getValueOrNull(null)).isNull();
+        assertThat(deserializedContainer.size()).isEqualTo(2);
+
+        // Verify getAnyValue returns one of the values
+        UUID anyValue = deserializedContainer.getAnyValue();
+        assertThat(anyValue).isIn(cluster1JobId, cluster2JobId);
+    }
+
     private static class Value
     {
         int a = 1;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to