This is an automated email from the ASF dual-hosted git repository.
ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2210e8be CASSANALYTICS-81: Unique restoreJobId per write target (#134)
2210e8be is described below
commit 2210e8be93b162bb201ea0b89131ea555bd4e7e1
Author: Yifan Cai <[email protected]>
AuthorDate: Thu Jul 31 16:36:22 2025 -0700
CASSANALYTICS-81: Unique restoreJobId per write target (#134)
Patch by Yifan Cai; Reviewed by Francisco Guerrero for CASSANALYTICS-81
---
CHANGES.txt | 1 +
.../sidecar/common/data/RestoreJobConstants.java | 2 +-
.../data/CreateRestoreJobRequestPayload.java | 2 +-
.../bulkwriter/AbstractBulkWriterContext.java | 12 +-
.../bulkwriter/CassandraBulkSourceRelation.java | 32 +++--
.../bulkwriter/CassandraBulkWriterContext.java | 9 ++
.../spark/bulkwriter/CassandraJobInfo.java | 28 +++-
.../apache/cassandra/spark/bulkwriter/JobInfo.java | 12 ++
.../CassandraCloudStorageTransportContext.java | 3 +-
.../CloudStorageDataTransferApiFactory.java | 13 +-
.../CloudStorageDataTransferApiImpl.java | 34 +++--
.../cloudstorage/CloudStorageStreamSession.java | 2 +-
.../CassandraCoordinatedBulkWriterContext.java | 10 ++
.../CoordinatedCloudStorageDataTransferApi.java | 12 +-
.../coordinated/CoordinatedImportCoordinator.java | 6 +-
.../coordinated/MultiClusterContainer.java | 7 +
.../spark/bulkwriter/CassandraJobInfoTest.java | 143 +++++++++++++++++++++
.../spark/bulkwriter/MockBulkWriterContext.java | 7 +
.../CloudStorageStreamSessionTest.java | 2 +-
.../ImportCompletionCoordinatorTest.java | 2 +-
.../CoordinatedImportCoordinatorTest.java | 27 ++--
.../coordinated/MultiClusterContainerTest.java | 43 +++++++
22 files changed, 350 insertions(+), 59 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 43411d1b..fdc8fed0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
0.2.0
-----
+ * Unique restoreJobId per write target (CASSANALYTICS-81)
* Allow writing to local datacenter only for coordinated write
(CASSANALYTICS-75)
0.1.0
diff --git
a/analytics-sidecar-client-common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobConstants.java
b/analytics-sidecar-client-common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobConstants.java
index 3b665dae..a6f76215 100644
---
a/analytics-sidecar-client-common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobConstants.java
+++
b/analytics-sidecar-client-common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobConstants.java
@@ -44,7 +44,7 @@ public class RestoreJobConstants
* datacenter, but the table replicates to multiple datacenters, it could
cause a large amount of repair streaming
* traffic. Arguably, you might want to leverage the intra-node repair
feature, then you can enable this option.
* Another use case that could justify the option is running distinct
restore jobs, one per datacenter, concurrently.
- * In this case, there is an external coordinator that manges the restore
job in each datacenter.
+ * In this case, there is an external coordinator that manages the restore
job in each datacenter.
*/
public static final String JOB_RESTORE_TO_LOCAL_DATA_CENTER_ONLY =
"restoreToLocalDatacenterOnly";
public static final String SLICE_ID = "sliceId";
diff --git
a/analytics-sidecar-client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/CreateRestoreJobRequestPayload.java
b/analytics-sidecar-client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/CreateRestoreJobRequestPayload.java
index 69142223..0e7fc1e3 100644
---
a/analytics-sidecar-client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/CreateRestoreJobRequestPayload.java
+++
b/analytics-sidecar-client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/CreateRestoreJobRequestPayload.java
@@ -76,7 +76,7 @@ public class CreateRestoreJobRequestPayload
* CreateRestoreJobRequest deserializer
*
* @param jobId job id of restore job
- * @param jobAgent arbitrary text a job can put, which can be used
to identity itself during Http request
+ * @param jobAgent arbitrary text a job can put, which can be used
to identify itself during Http request
* @param secrets secrets to be used by restore job to download
data
* @param importOptions the configured options for SSTable import
* @param expireAtInMillis a timestamp in the future when the job is
considered expired
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/AbstractBulkWriterContext.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/AbstractBulkWriterContext.java
index f464642c..87e5a979 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/AbstractBulkWriterContext.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/AbstractBulkWriterContext.java
@@ -32,6 +32,7 @@ import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import org.apache.cassandra.bridge.CassandraBridge;
import org.apache.cassandra.bridge.CassandraBridgeFactory;
+import
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.MultiClusterContainer;
import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
import org.apache.cassandra.spark.common.stats.JobStatsPublisher;
import org.apache.cassandra.spark.common.stats.LogStatsPublisher;
@@ -104,14 +105,21 @@ public abstract class AbstractBulkWriterContext
implements BulkWriterContext, Kr
validateKeyspaceReplication();
BulkSparkConf conf = bulkSparkConf();
TokenRangeMapping<RingInstance> tokenRangeMapping =
cluster().getTokenRangeMapping(true);
- UUID restoreJobId = bridge().getTimeUUID(); // used for creating
restore job on sidecar
TokenPartitioner tokenPartitioner = new
TokenPartitioner(tokenRangeMapping,
conf.numberSplits,
sparkDefaultParallelism(),
conf.getCores());
- return new CassandraJobInfo(conf, restoreJobId, tokenPartitioner);
+ return new CassandraJobInfo(conf, generateRestoreJobIds(),
tokenPartitioner);
}
+ /**
+ * Generate the restore job IDs used in the receiving Cassandra Sidecar
clusters.
+ * In the coordinated write mode, there should be a unique uuid per
cluster;
+ * In the single cluster write mode, the MultiClusterContainer would
contain one single entry.
+ * @return restore job ids that are unique per cluster
+ */
+ protected abstract MultiClusterContainer<UUID> generateRestoreJobIds();
+
protected CassandraBridge buildCassandraBridge()
{
return CassandraBridgeFactory.get(lowestCassandraVersion());
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java
index c5e255b3..40d1b330 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java
@@ -26,7 +26,6 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
-import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@@ -41,11 +40,11 @@ import
o.a.c.sidecar.client.shaded.common.request.data.CreateRestoreJobRequestPa
import
o.a.c.sidecar.client.shaded.common.request.data.UpdateRestoreJobRequestPayload;
import
org.apache.cassandra.spark.bulkwriter.cloudstorage.CloudStorageDataTransferApi;
import
org.apache.cassandra.spark.bulkwriter.cloudstorage.CloudStorageStreamResult;
-import org.apache.cassandra.spark.bulkwriter.cloudstorage.ImportCoordinator;
import
org.apache.cassandra.spark.bulkwriter.cloudstorage.ImportCompletionCoordinator;
+import org.apache.cassandra.spark.bulkwriter.cloudstorage.ImportCoordinator;
import
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CoordinatedCloudStorageDataTransferApi;
-import
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CoordinatedWriteConf;
import
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CoordinatedImportCoordinator;
+import
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CoordinatedWriteConf;
import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
import
org.apache.cassandra.spark.bulkwriter.token.MultiClusterReplicaAwareFailureHandler;
import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler;
@@ -279,7 +278,11 @@ public class CassandraBulkSourceRelation extends
BaseRelation implements Inserta
context.transportExtensionImplementation()
.onAllObjectsPersisted(objectCount, rowCount,
elapsedTimeMillis());
- setSliceCountForRestoreJob(context, objectCount);
+ // only need to set the slice count (for better progress tracking)
when in coordinated write mode
+ if (writerContext.job().isCoordinatedWriteEnabled())
+ {
+ setSliceCountForRestoreJob(context, objectCount);
+ }
awaitImportCompletion(context, resultsAsCloudStorageStreamResults);
markRestoreJobAsSucceeded(context);
@@ -460,7 +463,7 @@ public class CassandraBulkSourceRelation extends
BaseRelation implements Inserta
.toRestoreJobSecrets();
CoordinatedWriteConf.ClusterConf cluster =
coordinatedWriteConf.cluster(clusterId);
String localDc = cluster.resolveLocalDc(cl); // resolve the
cluster specific localDc name
- CreateRestoreJobRequestPayload payload =
createJobPayloadBuilder(job, secrets)
+ CreateRestoreJobRequestPayload payload =
createJobPayloadBuilder(job, secrets, clusterId)
.consistencyLevel(toSidecarConsistencyLevel(cl), localDc)
// TODO: add test case
once we advance the sidecar version that understands the flag,
// or a better
testing strategy is figured out
@@ -471,10 +474,15 @@ public class CassandraBulkSourceRelation extends
BaseRelation implements Inserta
}
private CreateRestoreJobRequestPayload.Builder
createJobPayloadBuilder(JobInfo job, RestoreJobSecrets secrets)
+ {
+ return createJobPayloadBuilder(job, secrets, null);
+ }
+
+ private CreateRestoreJobRequestPayload.Builder
createJobPayloadBuilder(JobInfo job, RestoreJobSecrets secrets, String
clusterId)
{
CreateRestoreJobRequestPayload.Builder builder =
CreateRestoreJobRequestPayload.builder(secrets, updatedLeaseTime());
builder.jobAgent(BuildInfo.APPLICATION_NAME)
- .jobId(job.getRestoreJobId())
+ .jobId(job.getRestoreJobId(clusterId))
.updateImportOptions(importOptions -> {
importOptions.verifySSTables(true) // we disallow the
end-user to bypass the non-extended verify anymore
.extendedVerify(false); // always turn off
@@ -490,15 +498,14 @@ public class CassandraBulkSourceRelation extends
BaseRelation implements Inserta
private void
setSliceCountForRestoreJob(TransportContext.CloudStorageTransportContext
context, long sliceCount)
{
UpdateRestoreJobRequestPayload requestPayload =
UpdateRestoreJobRequestPayload.builder().withSliceCount(sliceCount).build();
- UUID jobId = writerContext.job().getRestoreJobId();
try
{
- LOGGER.info("Setting slice count for the restore job. jobId={}
sliceCount={}", jobId, sliceCount);
+ LOGGER.info("Setting slice count for the restore job.
sliceCount={}", sliceCount);
context.dataTransferApi().updateRestoreJob(requestPayload);
}
catch (Exception e)
{
- LOGGER.warn("Failed to set slice count for the restore job.
jobId={}", jobId, e);
+ LOGGER.warn("Failed to set slice count for the restore job.", e);
// Do not rethrow - avoid triggering the catch block at the
call-site that marks job as failed.
}
}
@@ -506,17 +513,16 @@ public class CassandraBulkSourceRelation extends
BaseRelation implements Inserta
private void
markRestoreJobAsSucceeded(TransportContext.CloudStorageTransportContext context)
{
UpdateRestoreJobRequestPayload requestPayload =
UpdateRestoreJobRequestPayload.builder().withStatus(RestoreJobStatus.SUCCEEDED).build();
- UUID jobId = writerContext.job().getRestoreJobId();
try
{
- LOGGER.info("Marking the restore job as succeeded. jobId={}",
jobId);
+ LOGGER.info("Marking the restore job as succeeded.");
// Prioritize the call to extension, so onJobSucceeded is always
invoked.
context.transportExtensionImplementation().onJobSucceeded(elapsedTimeMillis());
context.dataTransferApi().updateRestoreJob(requestPayload);
}
catch (Exception e)
{
- LOGGER.warn("Failed to mark the restore job as succeeded.
jobId={}", jobId, e);
+ LOGGER.warn("Failed to mark the restore job as succeeded.", e);
// Do not rethrow - avoid triggering the catch block at the
call-site that marks job as failed.
}
}
@@ -525,8 +531,6 @@ public class CassandraBulkSourceRelation extends
BaseRelation implements Inserta
{
// Prioritize the call to extension, so onJobFailed is always invoked.
context.transportExtensionImplementation().onJobFailed(elapsedTimeMillis(),
cause);
- UUID jobId = writerContext.job().getRestoreJobId();
- LOGGER.info("Aborting job. jobId={}", jobId);
context.dataTransferApi().abortRestoreJob();
}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
index debf0eb4..2eea6276 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
@@ -19,9 +19,12 @@
package org.apache.cassandra.spark.bulkwriter;
+import java.util.UUID;
+
import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
+import
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.MultiClusterContainer;
import org.apache.spark.sql.types.StructType;
import org.jetbrains.annotations.NotNull;
@@ -57,4 +60,10 @@ public class CassandraBulkWriterContext extends
AbstractBulkWriterContext
&&
cluster().replicationFactor().getOptions().containsKey(conf.localDC);
Preconditions.checkState(isReplicatedToLocalDc, "Keyspace %s is not
replicated on datacenter %s", conf.keyspace, conf.localDC);
}
+
+ @Override
+ protected MultiClusterContainer<UUID> generateRestoreJobIds()
+ {
+ return MultiClusterContainer.ofSingle(bridge().getTimeUUID());
+ }
}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java
index 3759ea1c..27b8eb45 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java
@@ -19,11 +19,14 @@
package org.apache.cassandra.spark.bulkwriter;
+import java.util.NoSuchElementException;
import java.util.UUID;
import
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CoordinatedWriteConf;
+import
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.MultiClusterContainer;
import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
import org.apache.cassandra.spark.data.QualifiedTableName;
+import org.apache.cassandra.spark.utils.Preconditions;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -31,12 +34,14 @@ public class CassandraJobInfo implements JobInfo
{
private static final long serialVersionUID = 6140098484732683759L;
protected final BulkSparkConf conf;
- protected final UUID restoreJobId;
+ // restoreJobId per cluster; it is guaranteed to be non-empty
+ protected final MultiClusterContainer<UUID> restoreJobIds;
protected final TokenPartitioner tokenPartitioner;
- public CassandraJobInfo(BulkSparkConf conf, UUID restoreJobId,
TokenPartitioner tokenPartitioner)
+ public CassandraJobInfo(BulkSparkConf conf, MultiClusterContainer<UUID>
restoreJobIds, TokenPartitioner tokenPartitioner)
{
- this.restoreJobId = restoreJobId;
+ Preconditions.checkArgument(restoreJobIds.size() > 0, "restoreJobIds
cannot be empty");
+ this.restoreJobIds = restoreJobIds;
this.conf = conf;
this.tokenPartitioner = tokenPartitioner;
}
@@ -128,7 +133,22 @@ public class CassandraJobInfo implements JobInfo
@Override
public UUID getRestoreJobId()
{
- return restoreJobId;
+ // get the id assume it is single cluster scenario; using `null` to
retrieve.
+ try
+ {
+ return getRestoreJobId(null);
+ }
+ catch (NoSuchElementException nsee)
+ {
+ // if it is multi-cluster scenario, id would be null; in this
case, just retrieve a value
+ return restoreJobIds.getAnyValue();
+ }
+ }
+
+ @Override
+ public UUID getRestoreJobId(@Nullable String clusterId)
+ {
+ return restoreJobIds.getValueOrThrow(clusterId);
}
@Override
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
index bbbb03b7..2c4a089e 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
@@ -20,6 +20,7 @@
package org.apache.cassandra.spark.bulkwriter;
import java.io.Serializable;
+import java.util.NoSuchElementException;
import java.util.UUID;
import
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CoordinatedWriteConf;
@@ -52,10 +53,21 @@ public interface JobInfo extends Serializable
*/
UUID getRestoreJobId();
+ /**
+ * Returns the restore job identifier on Cassandra Sidecar of the cluster
identified by the clusterId
+ * The method should be called in the coordinated write code path.
+ *
+ * @param clusterId identifies the Cassandra cluster
+ * @return restore job identifier, a time-based uuid
+ * @throws NoSuchElementException when there is no restoreJobId associated
with the clusterId
+ */
+ UUID getRestoreJobId(@Nullable String clusterId) throws
NoSuchElementException;
+
/**
* An optional unique identified supplied in spark configuration
* @return a id string or null
*/
+ @Nullable
String getConfiguredJobId();
// Convenient method to decide a unique identified used for the job.
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CassandraCloudStorageTransportContext.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CassandraCloudStorageTransportContext.java
index 5f881b9d..9184dd5a 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CassandraCloudStorageTransportContext.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CassandraCloudStorageTransportContext.java
@@ -136,8 +136,7 @@ public class CassandraCloudStorageTransportContext
implements TransportContext.C
{
Class<StorageTransportExtension> clazz =
(Class<StorageTransportExtension>) Class.forName(transportExtensionClass);
StorageTransportExtension extension =
clazz.getConstructor().newInstance();
- LOGGER.info("Initializing storage transport extension. jobId={},
restoreJobId={}",
- jobInfo.getId(), jobInfo.getRestoreJobId());
+ LOGGER.info("Initializing storage transport extension. jobId={}",
jobInfo.getId());
extension.initialize(jobInfo.getId(), conf.getSparkConf(),
isOnDriver);
// Only assign when initialization is complete
// to avoid exposing uninitialized extension object, which leads
to unexpected behavior
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageDataTransferApiFactory.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageDataTransferApiFactory.java
index a20efb9f..192c045e 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageDataTransferApiFactory.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageDataTransferApiFactory.java
@@ -26,6 +26,7 @@ import org.apache.cassandra.spark.bulkwriter.ClusterInfo;
import org.apache.cassandra.spark.bulkwriter.JobInfo;
import
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CassandraClusterInfoGroup;
import
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CoordinatedCloudStorageDataTransferApi;
+import org.jetbrains.annotations.Nullable;
public class CloudStorageDataTransferApiFactory
{
@@ -46,15 +47,19 @@ public class CloudStorageDataTransferApiFactory
}
else
{
- return createForSingleCluster(storageClient, jobInfo, clusterInfo);
+ return createForSingleCluster(storageClient, jobInfo, clusterInfo,
null);
}
}
- private CloudStorageDataTransferApiImpl
createForSingleCluster(StorageClient storageClient, JobInfo jobInfo,
ClusterInfo clusterInfo)
+ private CloudStorageDataTransferApiImpl
createForSingleCluster(StorageClient storageClient,
+ JobInfo
jobInfo,
+ ClusterInfo
clusterInfo,
+ @Nullable
String clusterId)
{
return new CloudStorageDataTransferApiImpl(jobInfo,
clusterInfo.getCassandraContext().getSidecarClient(),
- storageClient);
+ storageClient,
+ clusterId);
}
private CoordinatedCloudStorageDataTransferApi
createForCoordinated(StorageClient storageClient,
@@ -63,7 +68,7 @@ public class CloudStorageDataTransferApiFactory
{
Map<String, CloudStorageDataTransferApiImpl> apiByClusterId = new
HashMap<>(clusterInfoGroup.size());
clusterInfoGroup.forEach((clusterId, clusterInfo) -> {
- apiByClusterId.put(clusterId,
createForSingleCluster(storageClient, jobInfo, clusterInfo));
+ apiByClusterId.put(clusterId,
createForSingleCluster(storageClient, jobInfo, clusterInfo, clusterId));
});
return new CoordinatedCloudStorageDataTransferApi(apiByClusterId);
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageDataTransferApiImpl.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageDataTransferApiImpl.java
index 3c44ccdb..b09cca5a 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageDataTransferApiImpl.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageDataTransferApiImpl.java
@@ -19,6 +19,7 @@
package org.apache.cassandra.spark.bulkwriter.cloudstorage;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
@@ -42,6 +43,7 @@ import org.apache.cassandra.spark.data.QualifiedTableName;
import org.apache.cassandra.spark.exception.S3ApiCallException;
import org.apache.cassandra.spark.exception.SidecarApiCallException;
import org.apache.cassandra.spark.transports.storage.StorageCredentials;
+import org.jetbrains.annotations.Nullable;
/**
* Encapsulates transfer APIs used by {@link CloudStorageStreamSession}.
{@link StorageClient} is used to interact with S3 and
@@ -49,15 +51,20 @@ import
org.apache.cassandra.spark.transports.storage.StorageCredentials;
*/
public class CloudStorageDataTransferApiImpl implements
CloudStorageDataTransferApi
{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CloudStorageDataTransferApiImpl.class);
+
+ @Nullable
+ private final String clusterId;
private final JobInfo jobInfo;
private final SidecarClient sidecarClient;
private final StorageClient storageClient;
- public CloudStorageDataTransferApiImpl(JobInfo jobInfo, SidecarClient
sidecarClient, StorageClient storageClient)
+ public CloudStorageDataTransferApiImpl(JobInfo jobInfo, SidecarClient
sidecarClient, StorageClient storageClient, @Nullable String clusterId)
{
this.jobInfo = jobInfo;
this.sidecarClient = sidecarClient;
this.storageClient = storageClient;
+ this.clusterId = clusterId;
}
public JobInfo jobInfo()
@@ -91,6 +98,7 @@ public class CloudStorageDataTransferApiImpl implements
CloudStorageDataTransfer
@Override
public void createRestoreJob(CreateRestoreJobRequestPayload
createRestoreJobRequestPayload) throws SidecarApiCallException
{
+ UUID jobId = jobInfo.getRestoreJobId(clusterId);
try
{
QualifiedTableName qualifiedTableName =
jobInfo.qualifiedTableName();
@@ -101,24 +109,25 @@ public class CloudStorageDataTransferApiImpl implements
CloudStorageDataTransfer
catch (Exception exception)
{
handleInterruption(exception);
- throw new SidecarApiCallException("Failed to create a new restore
job. restoreJobId=" + jobInfo.getRestoreJobId(), exception);
+ throw new SidecarApiCallException("Failed to create a new restore
job. restoreJobId=" + jobId, exception);
}
}
@Override
public RestoreJobSummaryResponsePayload restoreJobSummary() throws
SidecarApiCallException
{
+ UUID jobId = jobInfo.getRestoreJobId(clusterId);
try
{
QualifiedTableName qualifiedTableName =
jobInfo.qualifiedTableName();
return
sidecarClient.restoreJobSummary(qualifiedTableName.keyspace(),
qualifiedTableName.table(),
-
jobInfo.getRestoreJobId()).get();
+ jobId).get();
}
catch (Exception exception)
{
handleInterruption(exception);
- throw new SidecarApiCallException("Failed to retrieve restore job
summary. restoreJobId=" + jobInfo.getRestoreJobId(), exception);
+ throw new SidecarApiCallException("Failed to retrieve restore job
summary. restoreJobId=" + jobId, exception);
}
}
@@ -126,6 +135,7 @@ public class CloudStorageDataTransferApiImpl implements
CloudStorageDataTransfer
public void createRestoreSliceFromExecutor(SidecarInstance sidecarInstance,
CreateSliceRequestPayload
createSliceRequestPayload) throws SidecarApiCallException
{
+ UUID jobId = jobInfo.getRestoreJobId(clusterId);
try
{
createRestoreSliceWithCustomRetry(sidecarInstance,
createSliceRequestPayload, new ExecutorCreateSliceRetryPolicy(sidecarClient))
@@ -135,7 +145,7 @@ public class CloudStorageDataTransferApiImpl implements
CloudStorageDataTransfer
{
handleInterruption(exception);
throw new SidecarApiCallException("Failed to create restore slice.
" +
- "restoreJobId=" +
jobInfo.getRestoreJobId() +
+ "restoreJobId=" + jobId +
" payload=" +
createSliceRequestPayload,
exception);
}
@@ -152,35 +162,39 @@ public class CloudStorageDataTransferApiImpl implements
CloudStorageDataTransfer
@Override
public void updateRestoreJob(UpdateRestoreJobRequestPayload
updateRestoreJobRequestPayload) throws SidecarApiCallException
{
+ UUID jobId = jobInfo.getRestoreJobId(clusterId);
try
{
+ LOGGER.info("Updating the restore job. clusterId={}
restoreJobId={}", clusterId, jobId);
QualifiedTableName qualifiedTableName =
jobInfo.qualifiedTableName();
sidecarClient.updateRestoreJob(qualifiedTableName.keyspace(),
qualifiedTableName.table(),
- jobInfo.getRestoreJobId(),
+ jobId,
updateRestoreJobRequestPayload).get();
}
catch (Exception exception)
{
handleInterruption(exception);
- throw new SidecarApiCallException("Failed to update restore job.
restoreJobId=" + jobInfo.getRestoreJobId(), exception);
+ throw new SidecarApiCallException("Failed to update restore job.
restoreJobId=" + jobId, exception);
}
}
@Override
public void abortRestoreJob() throws SidecarApiCallException
{
+ UUID jobId = jobInfo.getRestoreJobId(clusterId);
try
{
+ LOGGER.info("Abort job. clusterId={} restoreJobId={}", clusterId,
jobId);
QualifiedTableName qualifiedTableName =
jobInfo.qualifiedTableName();
sidecarClient.abortRestoreJob(qualifiedTableName.keyspace(),
qualifiedTableName.table(),
- jobInfo.getRestoreJobId()).get();
+ jobId).get();
}
catch (Exception exception)
{
handleInterruption(exception);
- throw new SidecarApiCallException("Failed to abort restore job.
restoreJobId=" + jobInfo.getRestoreJobId(), exception);
+ throw new SidecarApiCallException("Failed to abort restore job.
restoreJobId=" + jobId, exception);
}
}
@@ -194,7 +208,7 @@ public class CloudStorageDataTransferApiImpl implements
CloudStorageDataTransfer
QualifiedTableName qualifiedTableName = jobInfo.qualifiedTableName();
CreateRestoreJobSliceRequest request = new
CreateRestoreJobSliceRequest(qualifiedTableName.keyspace(),
qualifiedTableName.table(),
-
jobInfo.getRestoreJobId(),
+
jobInfo.getRestoreJobId(clusterId),
createSliceRequestPayload);
return sidecarClient.executeRequestAsync(sidecarClient.requestBuilder()
.retryPolicy(retryPolicy)
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageStreamSession.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageStreamSession.java
index 964c4edf..0dcebbaf 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageStreamSession.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageStreamSession.java
@@ -95,7 +95,7 @@ public class CloudStorageStreamSession extends
StreamSession<TransportContext.Cl
JobInfo job = bulkWriterContext.job();
long maxSizePerBundleInBytes =
job.transportInfo().getMaxSizePerBundleInBytes();
- this.bundleNameGenerator = new
BundleNameGenerator(job.getRestoreJobId().toString(), sessionID);
+ this.bundleNameGenerator = new BundleNameGenerator(job.getId(),
sessionID);
this.dataTransferApi = transportContext.dataTransferApi();
this.bridge = bridge;
QualifiedTableName qualifiedTableName = job.qualifiedTableName();
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraCoordinatedBulkWriterContext.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraCoordinatedBulkWriterContext.java
index a6722e30..70fa59c5 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraCoordinatedBulkWriterContext.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraCoordinatedBulkWriterContext.java
@@ -19,6 +19,8 @@
package org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated;
+import java.util.UUID;
+
import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
@@ -82,6 +84,14 @@ public class CassandraCoordinatedBulkWriterContext extends
AbstractBulkWriterCon
});
}
+ @Override
+ protected MultiClusterContainer<UUID> generateRestoreJobIds()
+ {
+ MultiClusterContainer<UUID> result = new MultiClusterContainer<>();
+ clusterInfoGroup().forEach((clusterId, ignored) ->
result.setValue(clusterId, bridge().getTimeUUID()));
+ return result;
+ }
+
protected CassandraClusterInfoGroup clusterInfoGroup()
{
return (CassandraClusterInfoGroup) cluster();
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CoordinatedCloudStorageDataTransferApi.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CoordinatedCloudStorageDataTransferApi.java
index 81a93895..a56d896f 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CoordinatedCloudStorageDataTransferApi.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CoordinatedCloudStorageDataTransferApi.java
@@ -121,7 +121,7 @@ public class CoordinatedCloudStorageDataTransferApi
implements CloudStorageDataT
@Override
public void createRestoreSliceFromExecutor(String clusterId,
CreateSliceRequestPayload createSliceRequestPayload) throws
SidecarApiCallException
{
-
createRestoreSliceInternal(dataTransferApis.getValueOrThrow(clusterId),
createSliceRequestPayload);
+ createRestoreSliceInternal(clusterId,
dataTransferApis.getValueOrThrow(clusterId), createSliceRequestPayload);
}
@Override
@@ -161,15 +161,17 @@ public class CoordinatedCloudStorageDataTransferApi
implements CloudStorageDataT
throw new UnsupportedOperationException("Not supported for coordinated
write");
}
- private void createRestoreSliceInternal(CloudStorageDataTransferApiImpl
dataTransferApi,
+ private void createRestoreSliceInternal(String clusterId,
+ CloudStorageDataTransferApiImpl
dataTransferApi,
CreateSliceRequestPayload
createSliceRequestPayload) throws SidecarApiCallException
{
JobInfo jobInfo = dataTransferApi.jobInfo();
SidecarClient sidecarClient = dataTransferApi.sidecarClient();
QualifiedTableName qualifiedTableName = jobInfo.qualifiedTableName();
+ UUID restoreJobId = jobInfo.getRestoreJobId(clusterId);
CreateRestoreJobSliceRequest request = new
CreateRestoreJobSliceRequest(qualifiedTableName.keyspace(),
qualifiedTableName.table(),
-
jobInfo.getRestoreJobId(),
+
restoreJobId,
createSliceRequestPayload);
RetryPolicy retryPolicy = new
CloudStorageDataTransferApiImpl.ExecutorCreateSliceRetryPolicy(sidecarClient);
RequestContext requestContext =
sidecarClient.requestBuilder().retryPolicy(retryPolicy).request(request).build();
@@ -181,7 +183,7 @@ public class CoordinatedCloudStorageDataTransferApi
implements CloudStorageDataT
{
handleInterruption(exception);
throw new SidecarApiCallException("Failed to create restore job
slice. " +
- "restoreJobId=" +
jobInfo.getRestoreJobId() +
+ "restoreJobId=" + restoreJobId +
" sliceId=" +
createSliceRequestPayload.sliceId(),
exception);
}
@@ -194,7 +196,7 @@ public class CoordinatedCloudStorageDataTransferApi
implements CloudStorageDataT
{
JobInfo jobInfo = dataTransferApi.jobInfo();
QualifiedTableName qualifiedTableName = jobInfo.qualifiedTableName();
- UUID restoreJobId = jobInfo.getRestoreJobId();
+ UUID restoreJobId = jobInfo.getRestoreJobId(clusterId);
RestoreJobProgressRequestParams requestParams = new
RestoreJobProgressRequestParams(qualifiedTableName.keyspace(),
qualifiedTableName.table(),
restoreJobId,
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CoordinatedImportCoordinator.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CoordinatedImportCoordinator.java
index f7286ae2..80639d30 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CoordinatedImportCoordinator.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CoordinatedImportCoordinator.java
@@ -227,7 +227,11 @@ public class CoordinatedImportCoordinator implements
ImportCoordinator, Coordina
completedClusters.add(clusterId);
String error = String.format("Some of the token ranges cannot satisfy
with consistency level. " +
"job=%s phase=%s consistencyLevel=%s
clusterId=%s ranges=%s",
- job.getRestoreJobId(),
determineReadyPhase(), job.getConsistencyLevel(), clusterId,
progress.failedRanges());
+ job.getRestoreJobId(clusterId),
+ determineReadyPhase(),
+ job.getConsistencyLevel(),
+ clusterId,
+ progress.failedRanges());
LOGGER.error(error);
ConsistencyNotSatisfiedException exception = new
ConsistencyNotSatisfiedException(error);
failureHandler.accept(clusterId, exception);
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/MultiClusterContainer.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/MultiClusterContainer.java
index 4a9e58ac..a88b1cbc 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/MultiClusterContainer.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/MultiClusterContainer.java
@@ -48,6 +48,13 @@ public class MultiClusterContainer<T> implements
Serializable, MultiClusterSuppo
// For coordinated write, the key should be String values of clusterId
private final Map<Object, T> byCluster = new ConcurrentHashMap<>();
+ public static <T> MultiClusterContainer<T> ofSingle(T value)
+ {
+ MultiClusterContainer<T> result = new MultiClusterContainer<>();
+ result.setValue(null, value);
+ return result;
+ }
+
@Nullable
@Override
public T getValueOrNull(@Nullable String clusterId)
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfoTest.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfoTest.java
new file mode 100644
index 00000000..012fd7a4
--- /dev/null
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfoTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter;
+
+import java.util.HashSet;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.UUID;
+
+import org.junit.jupiter.api.Test;
+
+import
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.MultiClusterContainer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Unit tests for CassandraJobInfo class focusing on restore job ID handling
+ * in single and multi-cluster scenarios.
+ */
+class CassandraJobInfoTest
+{
+ private static final String CLUSTER_1 = "cluster1";
+ private static final String CLUSTER_2 = "cluster2";
+ private final UUID cluster1JobId = UUID.randomUUID();
+ private final UUID cluster2JobId = UUID.randomUUID();
+
+ @Test
+ void testGetRestoreJobIdInSingleClusterScenario()
+ {
+ MultiClusterContainer<UUID> restoreJobIds =
MultiClusterContainer.ofSingle(cluster1JobId);
+ CassandraJobInfo jobInfo = createJobInfo(restoreJobIds);
+ assertThat(jobInfo.getRestoreJobId()).isEqualTo(cluster1JobId);
+ assertThat(jobInfo.getRestoreJobId(null)).isEqualTo(cluster1JobId);
+ }
+
+ @Test
+ void testGetRestoreJobIdWithClusterIdReturnsCorrectUuidForSpecificCluster()
+ {
+ MultiClusterContainer<UUID> restoreJobIds = new
MultiClusterContainer<>();
+ restoreJobIds.setValue(CLUSTER_1, cluster1JobId);
+ restoreJobIds.setValue(CLUSTER_2, cluster2JobId);
+ CassandraJobInfo jobInfo = createJobInfo(restoreJobIds);
+
assertThat(jobInfo.getRestoreJobId(CLUSTER_1)).isEqualTo(cluster1JobId);
+
assertThat(jobInfo.getRestoreJobId(CLUSTER_2)).isEqualTo(cluster2JobId);
+ }
+
+ @Test
+ void testGetRestoreJobIdThrowsNoSuchElement()
+ {
+ MultiClusterContainer<UUID> restoreJobIds = new
MultiClusterContainer<>();
+ restoreJobIds.setValue(CLUSTER_1, cluster1JobId);
+ restoreJobIds.setValue(CLUSTER_2, cluster2JobId);
+ CassandraJobInfo jobInfo = createJobInfo(restoreJobIds);
+ assertThatThrownBy(() -> jobInfo.getRestoreJobId(null))
+ .isInstanceOf(NoSuchElementException.class);
+
+ assertThatThrownBy(() ->
jobInfo.getRestoreJobId("non-existent-cluster"))
+ .isInstanceOf(NoSuchElementException.class);
+ }
+
+ @Test
+ void
testGetRestoreJobIdFallbackToGetAnyValueWhenClusterIdIsNullAndNotFound()
+ {
+ MultiClusterContainer<UUID> restoreJobIds = new
MultiClusterContainer<>();
+ restoreJobIds.setValue(CLUSTER_1, cluster1JobId);
+ restoreJobIds.setValue(CLUSTER_2, cluster2JobId);
+ CassandraJobInfo jobInfo = createJobInfo(restoreJobIds);
+ // getRestoreJobId() without parameters should fall back to
getAnyValue()
+ UUID resultJobId = jobInfo.getRestoreJobId();
+ assertThat(resultJobId).isNotNull();
+ // Should be one of the two cluster job IDs
+ assertThat(resultJobId).isIn(cluster1JobId, cluster2JobId);
+ }
+
+ @Test
+ void testConstructorValidationFailsWithEmptyRestoreJobIds()
+ {
+ MultiClusterContainer<UUID> emptyContainer = new
MultiClusterContainer<>();
+ BulkSparkConf mockConf = mock(BulkSparkConf.class);
+ TokenPartitioner mockPartitioner = mock(TokenPartitioner.class);
+ assertThatThrownBy(() -> new CassandraJobInfo(mockConf,
emptyContainer, mockPartitioner))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("restoreJobIds cannot be empty");
+ }
+
+ @Test
+ void testGetRestoreJobIdConsistencyAcrossMultipleCalls()
+ {
+ UUID expectedJobId = UUID.randomUUID();
+ MultiClusterContainer<UUID> restoreJobIds =
MultiClusterContainer.ofSingle(expectedJobId);
+ CassandraJobInfo jobInfo = createJobInfo(restoreJobIds);
+ // Multiple calls should return the same UUID
+ Set<UUID> results = new HashSet<>();
+ results.add(jobInfo.getRestoreJobId());
+ results.add(jobInfo.getRestoreJobId());
+ results.add(jobInfo.getRestoreJobId(null));
+ assertThat(results).hasSize(1);
+ }
+
+ @Test
+ void testGetRestoreJobIdWithMultiClusterConsistency()
+ {
+ MultiClusterContainer<UUID> restoreJobIds = new
MultiClusterContainer<>();
+ restoreJobIds.setValue(CLUSTER_1, cluster1JobId);
+ restoreJobIds.setValue(CLUSTER_2, cluster2JobId);
+ CassandraJobInfo jobInfo = createJobInfo(restoreJobIds);
+ // Multiple calls for same cluster should return same UUID
+
assertThat(jobInfo.getRestoreJobId(CLUSTER_1)).isEqualTo(cluster1JobId);
+
assertThat(jobInfo.getRestoreJobId(CLUSTER_1)).isEqualTo(cluster1JobId);
+
assertThat(jobInfo.getRestoreJobId(CLUSTER_2)).isEqualTo(cluster2JobId);
+
assertThat(jobInfo.getRestoreJobId(CLUSTER_2)).isEqualTo(cluster2JobId);
+ // Fallback calls should be consistent within same instance
+ UUID fallback1 = jobInfo.getRestoreJobId();
+ UUID fallback2 = jobInfo.getRestoreJobId();
+ assertThat(fallback1).isEqualTo(fallback2);
+ }
+
+ private CassandraJobInfo createJobInfo(MultiClusterContainer<UUID>
restoreJobIds)
+ {
+ BulkSparkConf mockConf = mock(BulkSparkConf.class);
+ TokenPartitioner mockPartitioner = mock(TokenPartitioner.class);
+ return new CassandraJobInfo(mockConf, restoreJobIds, mockPartitioner);
+ }
+}
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
index e9f86caa..d65db038 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@@ -332,6 +333,12 @@ public class MockBulkWriterContext implements
BulkWriterContext, ClusterInfo, Jo
return jobId;
}
+ @Override
+ public UUID getRestoreJobId(@Nullable String clusterId) throws
NoSuchElementException
+ {
+ return jobId;
+ }
+
@Override
public String getConfiguredJobId()
{
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageStreamSessionTest.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageStreamSessionTest.java
index 987b5e12..539f625b 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageStreamSessionTest.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageStreamSessionTest.java
@@ -201,7 +201,7 @@ class CloudStorageStreamSessionTest
MockBlobTransferApi(JobInfo jobInfo, SidecarClient sidecarClient,
StorageClient storageClient)
{
- super(jobInfo, sidecarClient, storageClient);
+ super(jobInfo, sidecarClient, storageClient, null);
}
@Override
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/ImportCompletionCoordinatorTest.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/ImportCompletionCoordinatorTest.java
index dbc0d271..e4fd9616 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/ImportCompletionCoordinatorTest.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/ImportCompletionCoordinatorTest.java
@@ -130,7 +130,7 @@ class ImportCompletionCoordinatorTest
writerValidator = new BulkWriteValidator(mockWriterContext, new
MultiClusterReplicaAwareFailureHandler<>(Partitioner.Murmur3Partitioner));
// clients will not be used in this test class; mock is at the API
method level
- CloudStorageDataTransferApiImpl api = new
CloudStorageDataTransferApiImpl(mockJobInfo, mock(SidecarClient.class),
mock(StorageClient.class));
+ CloudStorageDataTransferApiImpl api = new
CloudStorageDataTransferApiImpl(mockJobInfo, mock(SidecarClient.class),
mock(StorageClient.class), null);
dataTransferApi = spy(api);
mockExtension = mock(StorageTransportExtension.class);
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CoordinatedImportCoordinatorTest.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CoordinatedImportCoordinatorTest.java
index 75d47205..99e3b8ba 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CoordinatedImportCoordinatorTest.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CoordinatedImportCoordinatorTest.java
@@ -61,6 +61,7 @@ import static org.assertj.core.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
@@ -72,7 +73,9 @@ import static org.mockito.Mockito.when;
class CoordinatedImportCoordinatorTest
{
CoordinatedImportCoordinator coordinator;
- UUID jobId;
+ UUID restoreJobId1 = UUID.randomUUID();
+ UUID restoreJobId2 = UUID.randomUUID();
+ String jobId = restoreJobId1.toString();
JobInfo mockJobInfo;
CassandraCoordinatedBulkWriterContext mockWriterContext;
StorageTransportExtension mockExtension;
@@ -89,9 +92,9 @@ class CoordinatedImportCoordinatorTest
public void setup() throws Exception
{
mockJobInfo = mock(JobInfo.class);
- jobId = UUID.randomUUID();
- when(mockJobInfo.getId()).thenReturn(jobId.toString());
- when(mockJobInfo.getRestoreJobId()).thenReturn(jobId);
+ when(mockJobInfo.getId()).thenReturn(jobId);
+
when(mockJobInfo.getRestoreJobId(eq(clusterId1))).thenReturn(restoreJobId1);
+
when(mockJobInfo.getRestoreJobId(eq(clusterId2))).thenReturn(restoreJobId2);
when(mockJobInfo.qualifiedTableName()).thenReturn(new
QualifiedTableName("testkeyspace", "testtable"));
when(mockJobInfo.getConsistencyLevel()).thenReturn(ConsistencyLevel.CL.LOCAL_QUORUM);
when(mockJobInfo.jobKeepAliveMinutes()).thenReturn(-1);
@@ -137,14 +140,14 @@ class CoordinatedImportCoordinatorTest
assertThat(coordinator.isStageReady()).isFalse();
assertThat(coordinator.isImportReady()).isFalse();
// signal stage ready
- coordinator.onStageReady(jobId.toString());
+ coordinator.onStageReady(jobId);
assertThat(coordinator.isStageReady()).isTrue();
loopAssert(() -> stagedClusters.getAllValues().size() == 2, "waiting
for all cluster to stage successfully");
assertThat(stagedClusters.getAllValues()).containsExactlyInAnyOrder(clusterId1,
clusterId2);
// signal apply read
- coordinator.onImportReady(jobId.toString());
+ coordinator.onImportReady(jobId);
loopAssert(() -> appliedClusters.getAllValues().size() == 2, "waiting
for all cluster to import successfully");
assertThat(appliedClusters.getAllValues()).containsExactlyInAnyOrder(clusterId1,
clusterId2);
@@ -165,7 +168,7 @@ class CoordinatedImportCoordinatorTest
CompletableFuture<Void> fut =
CompletableFuture.runAsync(coordinator::await);
// signal stage ready
- coordinator.onStageReady(jobId.toString());
+ coordinator.onStageReady(jobId);
loopAssert(() -> coordinator.failure() != null, "waiting for
coordinator to fail");
assertThat(coordinator.succeeded()).isFalse();
@@ -196,7 +199,7 @@ class CoordinatedImportCoordinatorTest
CompletableFuture.runAsync(coordinator::await);
// signal stage ready
- coordinator.onStageReady(jobId.toString());
+ coordinator.onStageReady(jobId);
loopAssert(() -> coordinator.failure() != null, "waiting for
coordinator to fail");
assertThat(coordinator.succeeded()).isFalse();
@@ -204,7 +207,7 @@ class CoordinatedImportCoordinatorTest
.isExactlyInstanceOf(ImportFailedException.class)
.hasRootCauseExactlyInstanceOf(ConsistencyNotSatisfiedException.class)
.hasRootCauseMessage("Some of the token ranges cannot satisfy with
consistency level. " +
- "job=" + jobId +
+ "job=" + restoreJobId1 +
" phase=STAGE_READY consistencyLevel=LOCAL_QUORUM
clusterId=cluster1 ranges=null");
}
@@ -224,13 +227,13 @@ class CoordinatedImportCoordinatorTest
CompletableFuture.runAsync(coordinator::await);
// signal stage ready
- coordinator.onStageReady(jobId.toString());
+ coordinator.onStageReady(jobId);
loopAssert(() -> stagedClusters.getAllValues().size() == 2, "waiting
for all cluster to stage successfully");
assertThat(stagedClusters.getAllValues()).containsExactlyInAnyOrder(clusterId1,
clusterId2);
// signal apply read
- coordinator.onImportReady(jobId.toString());
+ coordinator.onImportReady(jobId);
loopAssert(() -> coordinator.failure() != null, "waiting for
coordinator to fail");
assertThat(coordinator.succeeded()).isFalse();
@@ -238,7 +241,7 @@ class CoordinatedImportCoordinatorTest
.isExactlyInstanceOf(ImportFailedException.class)
.hasRootCauseExactlyInstanceOf(ConsistencyNotSatisfiedException.class)
.hasRootCauseMessage("Some of the token ranges cannot satisfy with
consistency level. " +
- "job=" + jobId +
+ "job=" + restoreJobId1 +
" phase=IMPORT_READY
consistencyLevel=LOCAL_QUORUM clusterId=cluster1 ranges=null");
}
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/MultiClusterContainerTest.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/MultiClusterContainerTest.java
index b449354f..555fb289 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/MultiClusterContainerTest.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/MultiClusterContainerTest.java
@@ -19,12 +19,16 @@
package org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated;
+import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.NoSuchElementException;
+import java.util.UUID;
import org.junit.jupiter.api.Test;
+import org.apache.cassandra.spark.utils.SerializationUtils;
+
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.fail;
@@ -189,6 +193,45 @@ class MultiClusterContainerTest
});
}
+ @Test
+ void testSerializationAndDeserializationWithSingleClusterContainer()
throws IOException, ClassNotFoundException
+ {
+ UUID expectedJobId = UUID.randomUUID();
+ MultiClusterContainer<UUID> container =
MultiClusterContainer.ofSingle(expectedJobId);
+
+ byte[] serialized = SerializationUtils.serialize(container);
+ MultiClusterContainer<UUID> deserializedContainer =
SerializationUtils.deserialize(serialized, MultiClusterContainer.class);
+
+ // Verify deserialized container works correctly
+
assertThat(deserializedContainer.getValueOrNull(null)).isEqualTo(expectedJobId);
+ assertThat(deserializedContainer.size()).isOne();
+
assertThat(deserializedContainer.getAnyValue()).isEqualTo(expectedJobId);
+ }
+
+ @Test
+ void testSerializationAndDeserializationWithMultiClusterContainer() throws
IOException, ClassNotFoundException
+ {
+ UUID cluster1JobId = UUID.randomUUID();
+ UUID cluster2JobId = UUID.randomUUID();
+
+ MultiClusterContainer<UUID> container = new MultiClusterContainer<>();
+ container.setValue("cluster1", cluster1JobId);
+ container.setValue("cluster2", cluster2JobId);
+
+ byte[] serialized = SerializationUtils.serialize(container);
+ MultiClusterContainer<UUID> deserializedContainer =
SerializationUtils.deserialize(serialized, MultiClusterContainer.class);
+
+ // Verify deserialized container works correctly
+
assertThat(deserializedContainer.getValueOrNull("cluster1")).isEqualTo(cluster1JobId);
+
assertThat(deserializedContainer.getValueOrNull("cluster2")).isEqualTo(cluster2JobId);
+ assertThat(deserializedContainer.getValueOrNull(null)).isNull();
+ assertThat(deserializedContainer.size()).isEqualTo(2);
+
+ // Verify getAnyValue returns one of the values
+ UUID anyValue = deserializedContainer.getAnyValue();
+ assertThat(anyValue).isIn(cluster1JobId, cluster2JobId);
+ }
+
private static class Value
{
int a = 1;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]