This is an automated email from the ASF dual-hosted git repository. ycai pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push: new 52c1bb02 CASSSIDECAR-269: Allow restore jobs to restore to the local datacenter only (#234) 52c1bb02 is described below commit 52c1bb02376af4a29467d364b17a99fb70da53b3 Author: Yifan Cai <y...@apache.org> AuthorDate: Thu Jul 3 09:54:39 2025 -0700 CASSSIDECAR-269: Allow restore jobs to restore to the local datacenter only (#234) Patch by Yifan Cai; Reviewed by Francisco Guerrero for CASSSIDECAR-269 --- CHANGES.txt | 1 + .../sidecar/common/data/RestoreJobConstants.java | 11 ++++ .../data/CreateRestoreJobRequestPayload.java | 30 ++++++++++- .../CreateRestoreJobRequestPayloadTest.java | 31 ++++++++++- .../apache/cassandra/sidecar/db/RestoreJob.java | 31 +++++++++-- .../sidecar/db/RestoreJobDatabaseAccessor.java | 2 + .../sidecar/db/schema/RestoreJobsSchema.java | 8 ++- .../sidecar/restore/RestoreJobDiscoverer.java | 49 +++++++++++++++++ .../db/RestoreJobDatabaseAccessorIntTest.java | 32 +++++++++-- .../cassandra/sidecar/db/SidecarSchemaTest.java | 26 ++++----- .../sidecar/restore/RestoreJobDiscovererTest.java | 63 +++++++++++++++++++++- 11 files changed, 256 insertions(+), 28 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index df2abe09..cc436d6e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 0.2.0 ----- + * Allow restore jobs to restore to the local datacenter only (CASSSIDECAR-269) * Avoid ending response in authentication handler, doesn't allow chaining of auth handlers (CASSSIDECAR-270) * Enhanced Sidecarclient to list Cassandra instance files and to download them for Live Migration (CASSSIDECAR-224) * Fix CdcRawDirectorySpaceCleaner (CASSSIDECAR-267) diff --git a/client-common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobConstants.java b/client-common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobConstants.java index a18b92d6..3b665dae 100644 --- a/client-common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobConstants.java +++ b/client-common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobConstants.java @@ -36,6 +36,17 @@ public class RestoreJobConstants public static final String JOB_CONSISTENCY_LEVEL = "consistencyLevel"; public static final String JOB_OPERATION_REASON = "reason"; public static final String JOB_LOCAL_DATA_CENTER = "localDatacenter"; + /** + * A boolean field; when restoreToLocalDatacenterOnly is set to true, localDatacenter must also be specified, and + * the restore job will only restore to the specified localDatacenter, regardless of the replication configuration + * of the belonging keyspace. + * The option should be used with caution. To help build an educated decision, when only restoring to the local + * datacenter, but the table replicates to multiple datacenters, it could cause a large amount of repair streaming + * traffic. Arguably, you might want to leverage the intra-node repair feature, then you can enable this option. + * Another use case that could justify the option is running distinct restore jobs, one per datacenter, concurrently. + * In this case, there is an external coordinator that manges the restore job in each datacenter. + */ + public static final String JOB_RESTORE_TO_LOCAL_DATA_CENTER_ONLY = "restoreToLocalDatacenterOnly"; public static final String SLICE_ID = "sliceId"; public static final String BUCKET_ID = "bucketId"; public static final String SLICE_START_TOKEN = "startToken"; diff --git a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/CreateRestoreJobRequestPayload.java b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/CreateRestoreJobRequestPayload.java index 9799d6ea..fa6580a5 100644 --- a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/CreateRestoreJobRequestPayload.java +++ b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/CreateRestoreJobRequestPayload.java @@ -33,6 +33,7 @@ import org.apache.cassandra.sidecar.common.data.RestoreJobSecrets; import org.apache.cassandra.sidecar.common.data.RestoreJobStatus; import org.apache.cassandra.sidecar.common.data.SSTableImportOptions; import org.apache.cassandra.sidecar.common.utils.Preconditions; +import org.apache.cassandra.sidecar.common.utils.StringUtils; import org.jetbrains.annotations.Nullable; import static org.apache.cassandra.sidecar.common.data.RestoreJobConstants.JOB_AGENT; @@ -41,6 +42,7 @@ import static org.apache.cassandra.sidecar.common.data.RestoreJobConstants.JOB_E import static org.apache.cassandra.sidecar.common.data.RestoreJobConstants.JOB_ID; import static org.apache.cassandra.sidecar.common.data.RestoreJobConstants.JOB_IMPORT_OPTIONS; import static org.apache.cassandra.sidecar.common.data.RestoreJobConstants.JOB_LOCAL_DATA_CENTER; +import static org.apache.cassandra.sidecar.common.data.RestoreJobConstants.JOB_RESTORE_TO_LOCAL_DATA_CENTER_ONLY; import static org.apache.cassandra.sidecar.common.data.RestoreJobConstants.JOB_SECRETS; /** @@ -55,6 +57,7 @@ public class CreateRestoreJobRequestPayload private final SSTableImportOptions importOptions; private final long expireAtInMillis; private final ConsistencyConfig consistencyConfig; + private final boolean localDatacenterOnly; /** * Builder to build a CreateRestoreJobRequest @@ -78,6 +81,8 @@ public class CreateRestoreJobRequestPayload * @param importOptions the configured options for SSTable import * @param expireAtInMillis a timestamp in the future when the job is considered expired * @param consistencyLevel consistency level a job should satisfy + * @param localDatacenter the local datacenter name; required if using local consistency level and localDatacenterOnly is specified + * @param localDatacenterOnly whether the job should restore to the specified local datacenter only */ @JsonCreator public CreateRestoreJobRequestPayload(@JsonProperty(JOB_ID) UUID jobId, @@ -86,7 +91,8 @@ public class CreateRestoreJobRequestPayload @JsonProperty(JOB_IMPORT_OPTIONS) SSTableImportOptions importOptions, @JsonProperty(JOB_EXPIRE_AT) long expireAtInMillis, @JsonProperty(JOB_CONSISTENCY_LEVEL) String consistencyLevel, - @JsonProperty(JOB_LOCAL_DATA_CENTER) String localDatacenter) + @JsonProperty(JOB_LOCAL_DATA_CENTER) String localDatacenter, + @JsonProperty(JOB_RESTORE_TO_LOCAL_DATA_CENTER_ONLY) boolean localDatacenterOnly) { Preconditions.checkArgument(jobId == null || jobId.version() == 1, "Only time based UUIDs allowed for jobId"); @@ -101,6 +107,9 @@ public class CreateRestoreJobRequestPayload : importOptions; this.expireAtInMillis = expireAtInMillis; this.consistencyConfig = ConsistencyConfig.parseString(consistencyLevel, localDatacenter); + Preconditions.checkArgument(!localDatacenterOnly || StringUtils.isNotEmpty(localDatacenter), + "Must specify a localDatacenter when restoreToLocalDatacenterOnly is true"); + this.localDatacenterOnly = localDatacenterOnly; } private CreateRestoreJobRequestPayload(Builder builder) @@ -111,7 +120,8 @@ public class CreateRestoreJobRequestPayload builder.importOptions, builder.expireAtInMillis, nameOrNull(builder.consistencyLevel), - builder.localDc); + builder.localDc, + builder.localDatacenterOnly); } /** @@ -189,6 +199,15 @@ public class CreateRestoreJobRequestPayload return consistencyConfig.localDatacenter; } + /** + * @return whether the job should restore only to its specified localDatacenter + */ + @JsonProperty(JOB_RESTORE_TO_LOCAL_DATA_CENTER_ONLY) + public boolean shouldRestoreToLocalDatacenterOnly() + { + return localDatacenterOnly; + } + public ConsistencyConfig consistencyConfig() { return consistencyConfig; @@ -204,6 +223,7 @@ public class CreateRestoreJobRequestPayload JOB_EXPIRE_AT + "='" + expireAtInMillis + "', " + JOB_CONSISTENCY_LEVEL + "='" + consistencyLevel() + "', " + JOB_LOCAL_DATA_CENTER + "='" + localDatacenter() + "', " + + JOB_RESTORE_TO_LOCAL_DATA_CENTER_ONLY + "='" + shouldRestoreToLocalDatacenterOnly() + "', " + JOB_IMPORT_OPTIONS + "='" + importOptions + "'}"; } @@ -220,6 +240,7 @@ public class CreateRestoreJobRequestPayload private String jobAgent = null; private ConsistencyLevel consistencyLevel = null; private String localDc = null; + private boolean localDatacenterOnly = false; Builder(RestoreJobSecrets secrets, long expireAtInMillis) { @@ -255,6 +276,11 @@ public class CreateRestoreJobRequestPayload }); } + public Builder restoreToLocalDatacenterOnly(boolean localDatacenterOnly) + { + return update(b -> b.localDatacenterOnly = localDatacenterOnly); + } + @Override public Builder self() { diff --git a/client-common/src/test/java/org/apache/cassandra/sidecar/common/request/CreateRestoreJobRequestPayloadTest.java b/client-common/src/test/java/org/apache/cassandra/sidecar/common/request/CreateRestoreJobRequestPayloadTest.java index 2b8b17bb..3683b2ed 100644 --- a/client-common/src/test/java/org/apache/cassandra/sidecar/common/request/CreateRestoreJobRequestPayloadTest.java +++ b/client-common/src/test/java/org/apache/cassandra/sidecar/common/request/CreateRestoreJobRequestPayloadTest.java @@ -72,7 +72,8 @@ class CreateRestoreJobRequestPayloadTest "\"invalidateCaches\":\"true\"," + "\"copyData\":\"false\"}," + "\"expireAt\":" + expireAt + "," + - "\"consistencyLevel\":\"QUORUM\"}"); + "\"consistencyLevel\":\"QUORUM\"," + + "\"restoreToLocalDatacenterOnly\":false}"); CreateRestoreJobRequestPayload test = MAPPER.readValue(json, CreateRestoreJobRequestPayload.class); assertThat(test.jobId()).hasToString(id); assertThat(test.jobAgent()).isEqualTo("agent"); @@ -82,6 +83,7 @@ class CreateRestoreJobRequestPayloadTest assertThat(test.importOptions()).isEqualTo(SSTableImportOptions.defaults()); assertThat(test.consistencyLevel()).isEqualTo("QUORUM"); assertThat(test.localDatacenter()).isNull(); + assertThat(test.shouldRestoreToLocalDatacenterOnly()).isFalse(); } @Test @@ -184,6 +186,7 @@ class CreateRestoreJobRequestPayloadTest assertThat(test.expireAtAsDate()).isEqualTo(date); assertThat(test.importOptions()).isEqualTo(SSTableImportOptions.defaults()); assertThat(test.consistencyLevel()).isNull(); + assertThat(test.shouldRestoreToLocalDatacenterOnly()).isFalse(); } @Test @@ -225,6 +228,32 @@ class CreateRestoreJobRequestPayloadTest .build()) .hasMessage("Must specify a non-empty localDatacenter for consistency level: " + localCL.name()); } + } + @Test + void testRestoreToLocalDatacenterOnly() + { + RestoreJobSecrets secrets = RestoreJobSecretsGen.genRestoreJobSecrets(); + CreateRestoreJobRequestPayload req = CreateRestoreJobRequestPayload + .builder(secrets, System.currentTimeMillis() + 10000) + .jobAgent("agent") + .consistencyLevel(ConsistencyLevel.LOCAL_QUORUM, "dc1") + .restoreToLocalDatacenterOnly(true) + .build(); + assertThat(req.shouldRestoreToLocalDatacenterOnly()).isTrue(); + } + + @Test + void testRestoreToLocalDatacenterOnlyWithSpecifyingLocalDatacenterFails() + { + RestoreJobSecrets secrets = RestoreJobSecretsGen.genRestoreJobSecrets(); + assertThatThrownBy(() -> CreateRestoreJobRequestPayload + .builder(secrets, System.currentTimeMillis() + 10000) + .jobAgent("agent") + .consistencyLevel(ConsistencyLevel.QUORUM) + .restoreToLocalDatacenterOnly(true) + .build()) + .isExactlyInstanceOf(IllegalArgumentException.class) + .hasMessage("Must specify a localDatacenter when restoreToLocalDatacenterOnly is true"); } } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreJob.java b/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreJob.java index 5f6ee8b0..f04367fc 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreJob.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreJob.java @@ -63,6 +63,8 @@ public class RestoreJob public final short bucketCount; public final @Nullable ConsistencyLevel consistencyLevel; public final @Nullable String localDatacenter; + // whether a restore job should restore to the local Cassandra nodes only; default is false + public final boolean shouldRestoreToLocalDatacenterOnly; public final Manager restoreJobManager; public final Long sliceCount; @@ -94,6 +96,7 @@ public class RestoreJob .sstableImportOptions(decodeSSTableImportOptions(row.getBytes("import_options"))) .consistencyLevel(consistencyConfig.consistencyLevel) .localDatacenter(consistencyConfig.localDatacenter) + .shouldRestoreToLocalDatacenterOnly(row.getBool("local_datacenter_only")) .sliceCount(row.get("slice_count", Long.class)); return builder.build(); @@ -134,12 +137,24 @@ public class RestoreJob || !builder.consistencyLevel.isLocalDcOnly || StringUtils.isNotEmpty(builder.localDatacenter), "When local consistency level is used, localDatacenter must also present"); + boolean hasEffectiveLocalDC = StringUtils.isNotEmpty(builder.localDatacenter); // log a warning when consistency level is absent or no local, but localDatacenter is defined - if ((builder.consistencyLevel == null || !builder.consistencyLevel.isLocalDcOnly) - && StringUtils.isNotEmpty(builder.localDatacenter)) + if ((builder.consistencyLevel == null || !builder.consistencyLevel.isLocalDcOnly) && hasEffectiveLocalDC) { LOGGER.warn("'localDatacenter' is defined but ignored. consistencyLevel={} localDatacenter={}", builder.consistencyLevel, builder.localDatacenter); + hasEffectiveLocalDC = false; + } + + if (builder.shouldRestoreToLocalDatacenterOnly && !hasEffectiveLocalDC) + { + this.shouldRestoreToLocalDatacenterOnly = false; + LOGGER.warn("shouldRestoreToLocalDatacenterOnly is true but 'localDatacenter' is not defined or invalid. " + + "Resetting shouldRestoreToLocalDatacenterOnly to false"); + } + else + { + this.shouldRestoreToLocalDatacenterOnly = builder.shouldRestoreToLocalDatacenterOnly; } this.createdAt = builder.createdAt; this.jobId = builder.jobId; @@ -213,12 +228,14 @@ public class RestoreJob return String.format("RestoreJob{" + "createdAt='%s', jobId='%s', keyspaceName='%s', " + "tableName='%s', status='%s', secrets='%s', importOptions='%s', " + - "expireAt='%s', bucketCount='%s', consistencyLevel='%s', localDatacenter='%s'}", + "expireAt='%s', bucketCount='%s', consistencyLevel='%s', localDatacenter='%s', " + + "shouldRestoreToLocalDatacenterOnly='%s'}", createdAt.toString(), jobId.toString(), keyspaceName, tableName, statusText, secrets, importOptions, expireAt, bucketCount, - consistencyLevel, localDatacenter); + consistencyLevel, localDatacenter, + shouldRestoreToLocalDatacenterOnly); } public static LocalDate toLocalDate(UUID jobId) @@ -256,6 +273,7 @@ public class RestoreJob private short bucketCount; private ConsistencyLevel consistencyLevel; private String localDatacenter; + private boolean shouldRestoreToLocalDatacenterOnly = false; private Manager manager; private Long sliceCount; @@ -367,6 +385,11 @@ public class RestoreJob return update(b -> b.localDatacenter = localDatacenter); } + public Builder shouldRestoreToLocalDatacenterOnly(boolean localDatacenterOnly) + { + return update(b -> b.shouldRestoreToLocalDatacenterOnly = localDatacenterOnly); + } + @Override public Builder self() { diff --git a/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessor.java b/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessor.java index 39cb3ba5..e596fcb0 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessor.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessor.java @@ -85,6 +85,7 @@ public class RestoreJobDatabaseAccessor extends DatabaseAccessor<RestoreJobsSche .expireAt(payload.expireAtAsDate()) .consistencyLevel(payload.consistencyConfig().consistencyLevel) .localDatacenter(payload.consistencyConfig().localDatacenter) + .shouldRestoreToLocalDatacenterOnly(payload.shouldRestoreToLocalDatacenterOnly()) .build(); ByteBuffer secrets = serializeValue(job.secrets, "secrets"); ByteBuffer importOptions = serializeValue(job.importOptions, "sstable import options"); @@ -99,6 +100,7 @@ public class RestoreJobDatabaseAccessor extends DatabaseAccessor<RestoreJobsSche importOptions, job.consistencyLevelText(), job.localDatacenter, + job.shouldRestoreToLocalDatacenterOnly, job.expireAt); execute(statement); diff --git a/server/src/main/java/org/apache/cassandra/sidecar/db/schema/RestoreJobsSchema.java b/server/src/main/java/org/apache/cassandra/sidecar/db/schema/RestoreJobsSchema.java index f2faf544..15ccb5b9 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/db/schema/RestoreJobsSchema.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/db/schema/RestoreJobsSchema.java @@ -32,7 +32,7 @@ import org.jetbrains.annotations.NotNull; */ public class RestoreJobsSchema extends TableSchema implements ExecuteOnClusterLeaseholderOnly { - private static final String RESTORE_JOB_TABLE_NAME = "restore_job_v4"; + private static final String RESTORE_JOB_TABLE_NAME = "restore_job_v5"; private final SchemaKeyspaceConfiguration keyspaceConfig; private final SecondBoundConfiguration tableTtl; @@ -95,6 +95,7 @@ public class RestoreJobsSchema extends TableSchema implements ExecuteOnClusterLe " bucket_count smallint," + " consistency_level text," + " local_datacenter text," + + " local_datacenter_only boolean," + " PRIMARY KEY (created_at, job_id)" + ") WITH default_time_to_live = %s", keyspaceConfig.keyspace(), RESTORE_JOB_TABLE_NAME, tableTtl.toSeconds()); @@ -155,8 +156,9 @@ public class RestoreJobsSchema extends TableSchema implements ExecuteOnClusterLe " import_options," + " consistency_level," + " local_datacenter," + + " local_datacenter_only," + " expire_at" + - ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", config); + ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", config); } static String updateBlobSecrets(SchemaKeyspaceConfiguration config) @@ -217,6 +219,7 @@ public class RestoreJobsSchema extends TableSchema implements ExecuteOnClusterLe "import_options, " + "consistency_level, " + "local_datacenter, " + + "local_datacenter_only, " + "expire_at, " + "slice_count " + "FROM %s.%s " + @@ -235,6 +238,7 @@ public class RestoreJobsSchema extends TableSchema implements ExecuteOnClusterLe "import_options, " + "consistency_level, " + "local_datacenter, " + + "local_datacenter_only, " + "expire_at, " + "slice_count " + "FROM %s.%s " + diff --git a/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java b/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java index 15e8673d..83787dd5 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java @@ -29,6 +29,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; + import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Sets; import org.slf4j.Logger; @@ -43,6 +44,7 @@ import io.vertx.core.Vertx; import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; import org.apache.cassandra.sidecar.cluster.locator.LocalTokenRangesProvider; import org.apache.cassandra.sidecar.common.data.RestoreJobStatus; +import org.apache.cassandra.sidecar.common.response.NodeSettings; import org.apache.cassandra.sidecar.common.response.TokenRangeReplicasResponse; import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange; import org.apache.cassandra.sidecar.common.server.utils.DurationSpec; @@ -56,6 +58,7 @@ import org.apache.cassandra.sidecar.db.RestoreRangeDatabaseAccessor; import org.apache.cassandra.sidecar.db.RestoreSlice; import org.apache.cassandra.sidecar.db.RestoreSliceDatabaseAccessor; import org.apache.cassandra.sidecar.db.schema.SidecarSchema; +import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException; import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException; import org.apache.cassandra.sidecar.metrics.SidecarMetrics; import org.apache.cassandra.sidecar.metrics.server.RestoreMetrics; @@ -84,6 +87,7 @@ public class RestoreJobDiscoverer implements PeriodicTask, RingTopologyChangeLis private final JobIdsByDay jobIdsByDay; private final RingTopologyRefresher ringTopologyRefresher; private final AtomicBoolean isExecuting = new AtomicBoolean(false); + private String localDatacenter = null; private int inflightJobsCount = 0; private int jobDiscoveryRecencyDays; private PeriodicTaskExecutor periodicTaskExecutor; @@ -166,6 +170,7 @@ public class RestoreJobDiscoverer implements PeriodicTask, RingTopologyChangeLis @Override public void execute(Promise<Void> promise) { + initLocalDatacenterMaybe(); tryExecuteDiscovery(); promise.tryComplete(); } @@ -374,6 +379,15 @@ public class RestoreJobDiscoverer implements PeriodicTask, RingTopologyChangeLis return; } + // stop proceeding further if the local datacenter is excluded from the restore job + if (isLocalDatacenterExcluded(job)) + { + LOGGER.info("Restore job is configured to skip running on the local datacenter. " + + "jobId={} localDatacenter={} targetDatacenter={}", + job.jobId, localDatacenter, job.localDatacenter); + return; + } + // Only force refresh topology for the first time in each stage // RestoreJobDiscoverer is registered as a RingTopologyListener to receive future topology changed notifications, if any ringTopologyRefresher.register(job, this); @@ -385,6 +399,41 @@ public class RestoreJobDiscoverer implements PeriodicTask, RingTopologyChangeLis } } + private void initLocalDatacenterMaybe() + { + if (localDatacenter != null) + { + return; + } + + try + { + NodeSettings nodeSettings = instanceMetadataFetcher.callOnFirstAvailableInstance(i -> i.delegate().nodeSettings()); + localDatacenter = nodeSettings.datacenter(); + } + catch (CassandraUnavailableException cue) + { + LOGGER.debug("localDatacenter is not initialized", cue); + } + } + + private boolean isLocalDatacenterExcluded(RestoreJob job) + { + if (!job.shouldRestoreToLocalDatacenterOnly) + { + return false; + } + + if (localDatacenter == null) + { + LOGGER.debug("The restore job should restore only to the local datacenter, but the local datacenter is undetermined yet; skip this run"); + return true; + } + + // when job should restore to local datacenter only, but the target datacenter is not the local one + return !Objects.equals(localDatacenter, job.localDatacenter); + } + private boolean shouldFindSlicesAndSubmit(RestoreJob job) { return (job.status == RestoreJobStatus.STAGE_READY || job.status == RestoreJobStatus.IMPORT_READY) diff --git a/server/src/test/integration/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessorIntTest.java b/server/src/test/integration/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessorIntTest.java index eb4a7886..8f8fbe71 100644 --- a/server/src/test/integration/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessorIntTest.java +++ b/server/src/test/integration/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessorIntTest.java @@ -19,10 +19,12 @@ package org.apache.cassandra.sidecar.db; import java.util.List; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.TimeUnit; import com.datastax.driver.core.utils.UUIDs; +import org.apache.cassandra.sidecar.common.data.ConsistencyLevel; import org.apache.cassandra.sidecar.common.data.RestoreJobSecrets; import org.apache.cassandra.sidecar.common.data.RestoreJobStatus; import org.apache.cassandra.sidecar.common.request.data.CreateRestoreJobRequestPayload; @@ -80,16 +82,36 @@ class RestoreJobDatabaseAccessorIntTest extends IntegrationTestBase assertThat(foundJobs).hasSize(3); accessor.abort(jobId, null); assertJob(accessor.find(jobId), jobId, RestoreJobStatus.ABORTED, expiresAtMillis, secrets, null); + + // create job that restoreToLocalDatacenterOnly + UUID restoreToLocalDcOnlyJobId = createJob(accessor, true); + foundJobs = accessor.findAllRecent(now, 3); + assertThat(foundJobs).hasSize(4); + Optional<RestoreJob> restoreToLocalDcOnlyJob = foundJobs.stream() + .filter(j -> j.jobId.equals(restoreToLocalDcOnlyJobId)) + .findFirst(); + assertThat(restoreToLocalDcOnlyJob).isNotEmpty(); + assertJob(restoreToLocalDcOnlyJob.get(), restoreToLocalDcOnlyJobId, RestoreJobStatus.CREATED, expiresAtMillis, secrets); + assertThat(restoreToLocalDcOnlyJob.get().shouldRestoreToLocalDatacenterOnly).isTrue(); } private UUID createJob(RestoreJobDatabaseAccessor accessor) + { + return createJob(accessor, false); + } + + private UUID createJob(RestoreJobDatabaseAccessor accessor, boolean restoreToLocalDatacenterOnly) { UUID jobId = UUIDs.timeBased(); - CreateRestoreJobRequestPayload payload = CreateRestoreJobRequestPayload.builder(secrets, expiresAtMillis) - .jobId(jobId) - .jobAgent("agent") - .build(); - accessor.create(payload, qualifiedTableName); + CreateRestoreJobRequestPayload.Builder builder = CreateRestoreJobRequestPayload.builder(secrets, expiresAtMillis) + .jobId(jobId) + .jobAgent("agent"); + if (restoreToLocalDatacenterOnly) + { + builder.consistencyLevel(ConsistencyLevel.LOCAL_QUORUM, "dc1"); + builder.restoreToLocalDatacenterOnly(true); + } + accessor.create(builder.build(), qualifiedTableName); return jobId; } diff --git a/server/src/test/java/org/apache/cassandra/sidecar/db/SidecarSchemaTest.java b/server/src/test/java/org/apache/cassandra/sidecar/db/SidecarSchemaTest.java index c57876c0..138ce81b 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/db/SidecarSchemaTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/db/SidecarSchemaTest.java @@ -137,7 +137,7 @@ public class SidecarSchemaTest assertThat(interceptedExecStmts.get(0)).as("Create keyspace should be executed the first") .contains("CREATE KEYSPACE IF NOT EXISTS sidecar_internal"); assertThat(interceptedExecStmts).as("Create table should be executed for job table") - .anyMatch(stmt -> stmt.contains("CREATE TABLE IF NOT EXISTS sidecar_internal.restore_job_v4")); + .anyMatch(stmt -> stmt.contains("CREATE TABLE IF NOT EXISTS sidecar_internal.restore_job_v5")); assertThat(interceptedExecStmts).as("Create table should be executed for slice table") .anyMatch(stmt -> stmt.contains("CREATE TABLE IF NOT EXISTS sidecar_internal.restore_slice_v3")); assertThat(interceptedExecStmts).as("Create table should be executed for range table") @@ -146,25 +146,27 @@ public class SidecarSchemaTest .anyMatch(stmt -> stmt.contains("CREATE TABLE IF NOT EXISTS sidecar_internal.role_permissions_v1")); List<String> expectedPrepStatements = Arrays.asList( - "INSERT INTO sidecar_internal.restore_job_v4 ( created_at, job_id, keyspace_name, table_name, " + - "job_agent, status, blob_secrets, import_options, consistency_level, local_datacenter, expire_at) " + - "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + "INSERT INTO sidecar_internal.restore_job_v5 ( created_at, job_id, keyspace_name, table_name, " + + "job_agent, status, blob_secrets, import_options, consistency_level, local_datacenter, local_datacenter_only, expire_at) " + + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", - "INSERT INTO sidecar_internal.restore_job_v4 ( created_at, job_id, blob_secrets) VALUES (?, ? ,?)", + "INSERT INTO sidecar_internal.restore_job_v5 ( created_at, job_id, blob_secrets) VALUES (?, ? ,?)", - "INSERT INTO sidecar_internal.restore_job_v4 ( created_at, job_id, status) VALUES (?, ?, ?)", + "INSERT INTO sidecar_internal.restore_job_v5 ( created_at, job_id, status) VALUES (?, ?, ?)", - "INSERT INTO sidecar_internal.restore_job_v4 ( created_at, job_id, job_agent) VALUES (?, ?, ?)", + "INSERT INTO sidecar_internal.restore_job_v5 ( created_at, job_id, job_agent) VALUES (?, ?, ?)", - "INSERT INTO sidecar_internal.restore_job_v4 ( created_at, job_id, expire_at) VALUES (?, ?, ?)", + "INSERT INTO sidecar_internal.restore_job_v5 ( created_at, job_id, expire_at) VALUES (?, ?, ?)", - "INSERT INTO sidecar_internal.restore_job_v4 ( created_at, job_id, slice_count) VALUES (?, ?, ?)", + "INSERT INTO sidecar_internal.restore_job_v5 ( created_at, job_id, slice_count) VALUES (?, ?, ?)", "SELECT created_at, job_id, keyspace_name, table_name, job_agent, status, blob_secrets, import_options, " + - "consistency_level, local_datacenter, expire_at, slice_count FROM sidecar_internal.restore_job_v4 WHERE created_at = ? AND job_id = ?", + "consistency_level, local_datacenter, local_datacenter_only, expire_at, slice_count " + + "FROM sidecar_internal.restore_job_v5 WHERE created_at = ? AND job_id = ?", "SELECT created_at, job_id, keyspace_name, table_name, job_agent, status, blob_secrets, import_options, " + - "consistency_level, local_datacenter, expire_at, slice_count FROM sidecar_internal.restore_job_v4 WHERE created_at = ?", + "consistency_level, local_datacenter, local_datacenter_only, expire_at, slice_count " + + "FROM sidecar_internal.restore_job_v5 WHERE created_at = ?", "INSERT INTO sidecar_internal.restore_slice_v3 ( job_id, bucket_id, slice_id, bucket, key, " + "checksum, start_token, end_token, compressed_size, uncompressed_size) " + @@ -210,7 +212,7 @@ public class SidecarSchemaTest .containsExactlyInAnyOrderElementsOf(expectedPrepStatements); assertThat(sidecarSchema.isInitialized()).as("Schema is successfully initialized").isTrue(); - assertTableSchema(sidecarSchema.tableSchema(RestoreJobsSchema.class), "sidecar_internal.restore_job_v4"); + assertTableSchema(sidecarSchema.tableSchema(RestoreJobsSchema.class), "sidecar_internal.restore_job_v5"); assertTableSchema(sidecarSchema.tableSchema(RestoreRangesSchema.class), "sidecar_internal.restore_range_v1"); assertTableSchema(sidecarSchema.tableSchema(RestoreSlicesSchema.class), "sidecar_internal.restore_slice_v3"); assertTableSchema(sidecarSchema.tableSchema(SidecarLeaseSchema.class), "sidecar_internal.sidecar_lease_v1"); diff --git a/server/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererTest.java b/server/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererTest.java index d4c3c384..9eec31fc 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererTest.java @@ -39,6 +39,7 @@ import org.apache.cassandra.sidecar.TestModule; import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; import org.apache.cassandra.sidecar.common.data.ConsistencyLevel; import org.apache.cassandra.sidecar.common.data.RestoreJobStatus; +import org.apache.cassandra.sidecar.common.response.NodeSettings; import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange; import org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration; import org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration; @@ -50,6 +51,7 @@ import org.apache.cassandra.sidecar.db.RestoreRangeDatabaseAccessor; import org.apache.cassandra.sidecar.db.RestoreSliceDatabaseAccessor; import org.apache.cassandra.sidecar.db.RestoreSliceTest; import org.apache.cassandra.sidecar.db.schema.SidecarSchema; +import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException; import org.apache.cassandra.sidecar.exceptions.RestoreJobExceptions; import org.apache.cassandra.sidecar.metrics.MetricRegistryFactory; import org.apache.cassandra.sidecar.metrics.SidecarMetrics; @@ -58,10 +60,12 @@ import org.apache.cassandra.sidecar.tasks.PeriodicTaskExecutor; import org.apache.cassandra.sidecar.tasks.ScheduleDecision; import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; import static org.apache.cassandra.sidecar.db.RestoreJobTest.createNewTestingJob; import static org.apache.cassandra.sidecar.db.RestoreJobTest.createTestingJob; import static org.apache.cassandra.sidecar.db.RestoreJobTest.createUpdatedJob; +import static org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException.Service.JMX; import static org.apache.cassandra.sidecar.utils.TestMetricUtils.registry; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; @@ -90,6 +94,7 @@ class RestoreJobDiscovererTest private final SidecarSchema sidecarSchema = mock(SidecarSchema.class); private final RingTopologyRefresher ringTopologyRefresher = mock(RingTopologyRefresher.class); private final InstanceMetadataFetcher instanceMetadataFetcher = mock(InstanceMetadataFetcher.class); + private final NodeSettings mockNodeSettings = mock(NodeSettings.class); private SidecarMetrics metrics; private RestoreJobDiscoverer loop; @@ -98,8 +103,9 @@ class RestoreJobDiscovererTest { MetricRegistryFactory mockRegistryFactory = mock(MetricRegistryFactory.class); when(mockRegistryFactory.getOrCreate()).thenReturn(registry()); - InstanceMetadataFetcher mockMetadataFetcher = mock(InstanceMetadataFetcher.class); - metrics = new SidecarMetricsImpl(mockRegistryFactory, mockMetadataFetcher); + when(mockNodeSettings.datacenter()).thenReturn("dc1"); + when(instanceMetadataFetcher.callOnFirstAvailableInstance(any())).thenReturn(mockNodeSettings); + metrics = new SidecarMetricsImpl(mockRegistryFactory, instanceMetadataFetcher); loop = new RestoreJobDiscoverer(testConfig(), sidecarSchema, mockJobAccessor, @@ -379,6 +385,59 @@ class RestoreJobDiscovererTest .isEqualTo(10); } + @Test + void testSkipNotOwnedRestoreToLocalDatacenterOnlyJob() + { + // Create a restore job that restores to dc2 only. Meanwhile, discoverer runs in dc1. + UUID jobId = UUIDs.timeBased(); + when(mockJobAccessor.findAllRecent(anyLong(), anyInt())) + .thenReturn(Collections.singletonList(RestoreJob.builder() + .createdAt(RestoreJob.toLocalDate(jobId)) + .jobId(jobId) + .jobAgent("agent") + .jobStatus(RestoreJobStatus.CREATED) + .expireAt(new Date(System.currentTimeMillis() + 10000L)) + .consistencyLevel(ConsistencyLevel.LOCAL_QUORUM) + .localDatacenter("dc2") + .shouldRestoreToLocalDatacenterOnly(true) + .build())); + executeBlocking(); + verify(mockRangeAccessor, never()).create(any()); + } + + @Test + void testRestoreToLocalDatacenterOnlyJobIsOnHoldWhenLocalDatacenterIsUndetermined() + { + // local datacenter is undetermined and the restore job is configured to restore to local datacenter only. + // the job is on hold until local datacenter is resolved in discoverer. + when(instanceMetadataFetcher.callOnFirstAvailableInstance(any())).thenThrow(new CassandraUnavailableException(JMX, "NodeSettings unavailable")); + UUID jobId = UUIDs.timeBased(); + when(mockJobAccessor.findAllRecent(anyLong(), anyInt())) + .thenReturn(Collections.singletonList(RestoreJob.builder() + .createdAt(RestoreJob.toLocalDate(jobId)) + .jobId(jobId) + .jobAgent("agent") + .jobStatus(RestoreJobStatus.CREATED) + .expireAt(new Date(System.currentTimeMillis() + 10000L)) + .consistencyLevel(ConsistencyLevel.LOCAL_QUORUM) + .localDatacenter("dc1") + .shouldRestoreToLocalDatacenterOnly(true) + .build())); + + executeBlocking(); + verify(ringTopologyRefresher, never()).register(any(), any()); + + // in the new run, the local datacenter is discovered. The discoverer proceeds further and register the job + Mockito.reset(instanceMetadataFetcher); + when(instanceMetadataFetcher.callOnFirstAvailableInstance(any())).thenReturn(mockNodeSettings); + executeBlocking(); + ArgumentCaptor<RestoreJob> restoreJobCaptor = ArgumentCaptor.forClass(RestoreJob.class); + verify(ringTopologyRefresher).register(restoreJobCaptor.capture(), any()); + assertThat(restoreJobCaptor.getAllValues()).hasSize(1); + RestoreJob captured = restoreJobCaptor.getValue(); + assertThat(captured.jobId).isEqualTo(jobId); + } + private RestoreJobConfiguration testConfig() { RestoreJobConfiguration restoreJobConfiguration = mock(RestoreJobConfiguration.class); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org