This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 52c1bb02 CASSSIDECAR-269: Allow restore jobs to restore to the local 
datacenter only (#234)
52c1bb02 is described below

commit 52c1bb02376af4a29467d364b17a99fb70da53b3
Author: Yifan Cai <y...@apache.org>
AuthorDate: Thu Jul 3 09:54:39 2025 -0700

    CASSSIDECAR-269: Allow restore jobs to restore to the local datacenter only 
(#234)
    
    Patch by Yifan Cai; Reviewed by Francisco Guerrero for CASSSIDECAR-269
---
 CHANGES.txt                                        |  1 +
 .../sidecar/common/data/RestoreJobConstants.java   | 11 ++++
 .../data/CreateRestoreJobRequestPayload.java       | 30 ++++++++++-
 .../CreateRestoreJobRequestPayloadTest.java        | 31 ++++++++++-
 .../apache/cassandra/sidecar/db/RestoreJob.java    | 31 +++++++++--
 .../sidecar/db/RestoreJobDatabaseAccessor.java     |  2 +
 .../sidecar/db/schema/RestoreJobsSchema.java       |  8 ++-
 .../sidecar/restore/RestoreJobDiscoverer.java      | 49 +++++++++++++++++
 .../db/RestoreJobDatabaseAccessorIntTest.java      | 32 +++++++++--
 .../cassandra/sidecar/db/SidecarSchemaTest.java    | 26 ++++-----
 .../sidecar/restore/RestoreJobDiscovererTest.java  | 63 +++++++++++++++++++++-
 11 files changed, 256 insertions(+), 28 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index df2abe09..cc436d6e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 0.2.0
 -----
+ * Allow restore jobs to restore to the local datacenter only (CASSSIDECAR-269)
  * Avoid ending response in authentication handler, doesn't allow chaining of 
auth handlers (CASSSIDECAR-270)
  * Enhanced Sidecarclient to list Cassandra instance files and to download 
them for Live Migration (CASSSIDECAR-224)
  * Fix CdcRawDirectorySpaceCleaner (CASSSIDECAR-267)
diff --git 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobConstants.java
 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobConstants.java
index a18b92d6..3b665dae 100644
--- 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobConstants.java
+++ 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobConstants.java
@@ -36,6 +36,17 @@ public class RestoreJobConstants
     public static final String JOB_CONSISTENCY_LEVEL = "consistencyLevel";
     public static final String JOB_OPERATION_REASON = "reason";
     public static final String JOB_LOCAL_DATA_CENTER = "localDatacenter";
+    /**
+     * A boolean field; when restoreToLocalDatacenterOnly is set to true, 
localDatacenter must also be specified, and
+     * the restore job will only restore to the specified localDatacenter, 
regardless of the replication configuration
+     * of the belonging keyspace.
+     * The option should be used with caution. To help build an educated 
decision, when only restoring to the local
+     * datacenter, but the table replicates to multiple datacenters, it could 
cause a large amount of repair streaming
+     * traffic. Arguably, you might want to leverage the intra-node repair 
feature, then you can enable this option.
+     * Another use case that could justify the option is running distinct 
restore jobs, one per datacenter, concurrently.
+     * In this case, there is an external coordinator that manges the restore 
job in each datacenter.
+     */
+    public static final String JOB_RESTORE_TO_LOCAL_DATA_CENTER_ONLY = 
"restoreToLocalDatacenterOnly";
     public static final String SLICE_ID = "sliceId";
     public static final String BUCKET_ID = "bucketId";
     public static final String SLICE_START_TOKEN = "startToken";
diff --git 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/CreateRestoreJobRequestPayload.java
 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/CreateRestoreJobRequestPayload.java
index 9799d6ea..fa6580a5 100644
--- 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/CreateRestoreJobRequestPayload.java
+++ 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/CreateRestoreJobRequestPayload.java
@@ -33,6 +33,7 @@ import 
org.apache.cassandra.sidecar.common.data.RestoreJobSecrets;
 import org.apache.cassandra.sidecar.common.data.RestoreJobStatus;
 import org.apache.cassandra.sidecar.common.data.SSTableImportOptions;
 import org.apache.cassandra.sidecar.common.utils.Preconditions;
+import org.apache.cassandra.sidecar.common.utils.StringUtils;
 import org.jetbrains.annotations.Nullable;
 
 import static 
org.apache.cassandra.sidecar.common.data.RestoreJobConstants.JOB_AGENT;
@@ -41,6 +42,7 @@ import static 
org.apache.cassandra.sidecar.common.data.RestoreJobConstants.JOB_E
 import static 
org.apache.cassandra.sidecar.common.data.RestoreJobConstants.JOB_ID;
 import static 
org.apache.cassandra.sidecar.common.data.RestoreJobConstants.JOB_IMPORT_OPTIONS;
 import static 
org.apache.cassandra.sidecar.common.data.RestoreJobConstants.JOB_LOCAL_DATA_CENTER;
+import static 
org.apache.cassandra.sidecar.common.data.RestoreJobConstants.JOB_RESTORE_TO_LOCAL_DATA_CENTER_ONLY;
 import static 
org.apache.cassandra.sidecar.common.data.RestoreJobConstants.JOB_SECRETS;
 
 /**
@@ -55,6 +57,7 @@ public class CreateRestoreJobRequestPayload
     private final SSTableImportOptions importOptions;
     private final long expireAtInMillis;
     private final ConsistencyConfig consistencyConfig;
+    private final boolean localDatacenterOnly;
 
     /**
      * Builder to build a CreateRestoreJobRequest
@@ -78,6 +81,8 @@ public class CreateRestoreJobRequestPayload
      * @param importOptions    the configured options for SSTable import
      * @param expireAtInMillis a timestamp in the future when the job is 
considered expired
      * @param consistencyLevel consistency level a job should satisfy
+     * @param localDatacenter  the local datacenter name; required if using 
local consistency level and localDatacenterOnly is specified
+     * @param localDatacenterOnly whether the job should restore to the 
specified local datacenter only
      */
     @JsonCreator
     public CreateRestoreJobRequestPayload(@JsonProperty(JOB_ID) UUID jobId,
@@ -86,7 +91,8 @@ public class CreateRestoreJobRequestPayload
                                           @JsonProperty(JOB_IMPORT_OPTIONS) 
SSTableImportOptions importOptions,
                                           @JsonProperty(JOB_EXPIRE_AT) long 
expireAtInMillis,
                                           @JsonProperty(JOB_CONSISTENCY_LEVEL) 
String consistencyLevel,
-                                          @JsonProperty(JOB_LOCAL_DATA_CENTER) 
String localDatacenter)
+                                          @JsonProperty(JOB_LOCAL_DATA_CENTER) 
String localDatacenter,
+                                          
@JsonProperty(JOB_RESTORE_TO_LOCAL_DATA_CENTER_ONLY) boolean 
localDatacenterOnly)
     {
         Preconditions.checkArgument(jobId == null || jobId.version() == 1,
                                     "Only time based UUIDs allowed for jobId");
@@ -101,6 +107,9 @@ public class CreateRestoreJobRequestPayload
                              : importOptions;
         this.expireAtInMillis = expireAtInMillis;
         this.consistencyConfig = 
ConsistencyConfig.parseString(consistencyLevel, localDatacenter);
+        Preconditions.checkArgument(!localDatacenterOnly || 
StringUtils.isNotEmpty(localDatacenter),
+                                    "Must specify a localDatacenter when 
restoreToLocalDatacenterOnly is true");
+        this.localDatacenterOnly = localDatacenterOnly;
     }
 
     private CreateRestoreJobRequestPayload(Builder builder)
@@ -111,7 +120,8 @@ public class CreateRestoreJobRequestPayload
              builder.importOptions,
              builder.expireAtInMillis,
              nameOrNull(builder.consistencyLevel),
-             builder.localDc);
+             builder.localDc,
+             builder.localDatacenterOnly);
     }
 
     /**
@@ -189,6 +199,15 @@ public class CreateRestoreJobRequestPayload
         return consistencyConfig.localDatacenter;
     }
 
+    /**
+     * @return whether the job should restore only to its specified 
localDatacenter
+     */
+    @JsonProperty(JOB_RESTORE_TO_LOCAL_DATA_CENTER_ONLY)
+    public boolean shouldRestoreToLocalDatacenterOnly()
+    {
+        return localDatacenterOnly;
+    }
+
     public ConsistencyConfig consistencyConfig()
     {
         return consistencyConfig;
@@ -204,6 +223,7 @@ public class CreateRestoreJobRequestPayload
                JOB_EXPIRE_AT + "='" + expireAtInMillis + "', " +
                JOB_CONSISTENCY_LEVEL + "='" + consistencyLevel() + "', " +
                JOB_LOCAL_DATA_CENTER + "='" + localDatacenter() + "', " +
+               JOB_RESTORE_TO_LOCAL_DATA_CENTER_ONLY + "='" + 
shouldRestoreToLocalDatacenterOnly() + "', " +
                JOB_IMPORT_OPTIONS + "='" + importOptions + "'}";
     }
 
@@ -220,6 +240,7 @@ public class CreateRestoreJobRequestPayload
         private String jobAgent = null;
         private ConsistencyLevel consistencyLevel = null;
         private String localDc = null;
+        private boolean localDatacenterOnly = false;
 
         Builder(RestoreJobSecrets secrets, long expireAtInMillis)
         {
@@ -255,6 +276,11 @@ public class CreateRestoreJobRequestPayload
             });
         }
 
+        public Builder restoreToLocalDatacenterOnly(boolean 
localDatacenterOnly)
+        {
+            return update(b -> b.localDatacenterOnly = localDatacenterOnly);
+        }
+
         @Override
         public Builder self()
         {
diff --git 
a/client-common/src/test/java/org/apache/cassandra/sidecar/common/request/CreateRestoreJobRequestPayloadTest.java
 
b/client-common/src/test/java/org/apache/cassandra/sidecar/common/request/CreateRestoreJobRequestPayloadTest.java
index 2b8b17bb..3683b2ed 100644
--- 
a/client-common/src/test/java/org/apache/cassandra/sidecar/common/request/CreateRestoreJobRequestPayloadTest.java
+++ 
b/client-common/src/test/java/org/apache/cassandra/sidecar/common/request/CreateRestoreJobRequestPayloadTest.java
@@ -72,7 +72,8 @@ class CreateRestoreJobRequestPayloadTest
                                    "\"invalidateCaches\":\"true\"," +
                                    "\"copyData\":\"false\"}," +
                                    "\"expireAt\":" + expireAt + "," +
-                                   "\"consistencyLevel\":\"QUORUM\"}");
+                                   "\"consistencyLevel\":\"QUORUM\"," +
+                                   "\"restoreToLocalDatacenterOnly\":false}");
         CreateRestoreJobRequestPayload test = MAPPER.readValue(json, 
CreateRestoreJobRequestPayload.class);
         assertThat(test.jobId()).hasToString(id);
         assertThat(test.jobAgent()).isEqualTo("agent");
@@ -82,6 +83,7 @@ class CreateRestoreJobRequestPayloadTest
         
assertThat(test.importOptions()).isEqualTo(SSTableImportOptions.defaults());
         assertThat(test.consistencyLevel()).isEqualTo("QUORUM");
         assertThat(test.localDatacenter()).isNull();
+        assertThat(test.shouldRestoreToLocalDatacenterOnly()).isFalse();
     }
 
     @Test
@@ -184,6 +186,7 @@ class CreateRestoreJobRequestPayloadTest
         assertThat(test.expireAtAsDate()).isEqualTo(date);
         
assertThat(test.importOptions()).isEqualTo(SSTableImportOptions.defaults());
         assertThat(test.consistencyLevel()).isNull();
+        assertThat(test.shouldRestoreToLocalDatacenterOnly()).isFalse();
     }
 
     @Test
@@ -225,6 +228,32 @@ class CreateRestoreJobRequestPayloadTest
                                                                    .build())
             .hasMessage("Must specify a non-empty localDatacenter for 
consistency level: " + localCL.name());
         }
+    }
 
+    @Test
+    void testRestoreToLocalDatacenterOnly()
+    {
+        RestoreJobSecrets secrets = 
RestoreJobSecretsGen.genRestoreJobSecrets();
+        CreateRestoreJobRequestPayload req = CreateRestoreJobRequestPayload
+                                             .builder(secrets, 
System.currentTimeMillis() + 10000)
+                                             .jobAgent("agent")
+                                             
.consistencyLevel(ConsistencyLevel.LOCAL_QUORUM, "dc1")
+                                             
.restoreToLocalDatacenterOnly(true)
+                                             .build();
+        assertThat(req.shouldRestoreToLocalDatacenterOnly()).isTrue();
+    }
+
+    @Test
+    void testRestoreToLocalDatacenterOnlyWithSpecifyingLocalDatacenterFails()
+    {
+        RestoreJobSecrets secrets = 
RestoreJobSecretsGen.genRestoreJobSecrets();
+        assertThatThrownBy(() -> CreateRestoreJobRequestPayload
+                                 .builder(secrets, System.currentTimeMillis() 
+ 10000)
+                                 .jobAgent("agent")
+                                 .consistencyLevel(ConsistencyLevel.QUORUM)
+                                 .restoreToLocalDatacenterOnly(true)
+                                 .build())
+        .isExactlyInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Must specify a localDatacenter when 
restoreToLocalDatacenterOnly is true");
     }
 }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreJob.java 
b/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreJob.java
index 5f6ee8b0..f04367fc 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreJob.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreJob.java
@@ -63,6 +63,8 @@ public class RestoreJob
     public final short bucketCount;
     public final @Nullable ConsistencyLevel consistencyLevel;
     public final @Nullable String localDatacenter;
+    // whether a restore job should restore to the local Cassandra nodes only; 
default is false
+    public final boolean shouldRestoreToLocalDatacenterOnly;
     public final Manager restoreJobManager;
     public final Long sliceCount;
 
@@ -94,6 +96,7 @@ public class RestoreJob
                
.sstableImportOptions(decodeSSTableImportOptions(row.getBytes("import_options")))
                .consistencyLevel(consistencyConfig.consistencyLevel)
                .localDatacenter(consistencyConfig.localDatacenter)
+               
.shouldRestoreToLocalDatacenterOnly(row.getBool("local_datacenter_only"))
                .sliceCount(row.get("slice_count", Long.class));
 
         return builder.build();
@@ -134,12 +137,24 @@ public class RestoreJob
                                     || !builder.consistencyLevel.isLocalDcOnly
                                     || 
StringUtils.isNotEmpty(builder.localDatacenter),
                                     "When local consistency level is used, 
localDatacenter must also present");
+        boolean hasEffectiveLocalDC = 
StringUtils.isNotEmpty(builder.localDatacenter);
         // log a warning when consistency level is absent or no local, but 
localDatacenter is defined
-        if ((builder.consistencyLevel == null || 
!builder.consistencyLevel.isLocalDcOnly)
-            && StringUtils.isNotEmpty(builder.localDatacenter))
+        if ((builder.consistencyLevel == null || 
!builder.consistencyLevel.isLocalDcOnly) && hasEffectiveLocalDC)
         {
             LOGGER.warn("'localDatacenter' is defined but ignored. 
consistencyLevel={} localDatacenter={}",
                         builder.consistencyLevel, builder.localDatacenter);
+            hasEffectiveLocalDC = false;
+        }
+
+        if (builder.shouldRestoreToLocalDatacenterOnly && !hasEffectiveLocalDC)
+        {
+            this.shouldRestoreToLocalDatacenterOnly = false;
+            LOGGER.warn("shouldRestoreToLocalDatacenterOnly is true but 
'localDatacenter' is not defined or invalid. " +
+                        "Resetting shouldRestoreToLocalDatacenterOnly to 
false");
+        }
+        else
+        {
+            this.shouldRestoreToLocalDatacenterOnly = 
builder.shouldRestoreToLocalDatacenterOnly;
         }
         this.createdAt = builder.createdAt;
         this.jobId = builder.jobId;
@@ -213,12 +228,14 @@ public class RestoreJob
         return String.format("RestoreJob{" +
                              "createdAt='%s', jobId='%s', keyspaceName='%s', " 
+
                              "tableName='%s', status='%s', secrets='%s', 
importOptions='%s', " +
-                             "expireAt='%s', bucketCount='%s', 
consistencyLevel='%s', localDatacenter='%s'}",
+                             "expireAt='%s', bucketCount='%s', 
consistencyLevel='%s', localDatacenter='%s', " +
+                             "shouldRestoreToLocalDatacenterOnly='%s'}",
                              createdAt.toString(), jobId.toString(),
                              keyspaceName, tableName,
                              statusText, secrets, importOptions,
                              expireAt, bucketCount,
-                             consistencyLevel, localDatacenter);
+                             consistencyLevel, localDatacenter,
+                             shouldRestoreToLocalDatacenterOnly);
     }
 
     public static LocalDate toLocalDate(UUID jobId)
@@ -256,6 +273,7 @@ public class RestoreJob
         private short bucketCount;
         private ConsistencyLevel consistencyLevel;
         private String localDatacenter;
+        private boolean shouldRestoreToLocalDatacenterOnly = false;
         private Manager manager;
         private Long sliceCount;
 
@@ -367,6 +385,11 @@ public class RestoreJob
             return update(b -> b.localDatacenter = localDatacenter);
         }
 
+        public Builder shouldRestoreToLocalDatacenterOnly(boolean 
localDatacenterOnly)
+        {
+            return update(b -> b.shouldRestoreToLocalDatacenterOnly = 
localDatacenterOnly);
+        }
+
         @Override
         public Builder self()
         {
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessor.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessor.java
index 39cb3ba5..e596fcb0 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessor.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessor.java
@@ -85,6 +85,7 @@ public class RestoreJobDatabaseAccessor extends 
DatabaseAccessor<RestoreJobsSche
                                    .expireAt(payload.expireAtAsDate())
                                    
.consistencyLevel(payload.consistencyConfig().consistencyLevel)
                                    
.localDatacenter(payload.consistencyConfig().localDatacenter)
+                                   
.shouldRestoreToLocalDatacenterOnly(payload.shouldRestoreToLocalDatacenterOnly())
                                    .build();
         ByteBuffer secrets = serializeValue(job.secrets, "secrets");
         ByteBuffer importOptions = serializeValue(job.importOptions, "sstable 
import options");
@@ -99,6 +100,7 @@ public class RestoreJobDatabaseAccessor extends 
DatabaseAccessor<RestoreJobsSche
                                                     importOptions,
                                                     job.consistencyLevelText(),
                                                     job.localDatacenter,
+                                                    
job.shouldRestoreToLocalDatacenterOnly,
                                                     job.expireAt);
 
         execute(statement);
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/db/schema/RestoreJobsSchema.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/db/schema/RestoreJobsSchema.java
index f2faf544..15ccb5b9 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/db/schema/RestoreJobsSchema.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/db/schema/RestoreJobsSchema.java
@@ -32,7 +32,7 @@ import org.jetbrains.annotations.NotNull;
  */
 public class RestoreJobsSchema extends TableSchema implements 
ExecuteOnClusterLeaseholderOnly
 {
-    private static final String RESTORE_JOB_TABLE_NAME = "restore_job_v4";
+    private static final String RESTORE_JOB_TABLE_NAME = "restore_job_v5";
 
     private final SchemaKeyspaceConfiguration keyspaceConfig;
     private final SecondBoundConfiguration tableTtl;
@@ -95,6 +95,7 @@ public class RestoreJobsSchema extends TableSchema implements 
ExecuteOnClusterLe
                              "  bucket_count smallint," +
                              "  consistency_level text," +
                              "  local_datacenter text," +
+                             "  local_datacenter_only boolean," +
                              "  PRIMARY KEY (created_at, job_id)" +
                              ") WITH default_time_to_live = %s",
                              keyspaceConfig.keyspace(), 
RESTORE_JOB_TABLE_NAME, tableTtl.toSeconds());
@@ -155,8 +156,9 @@ public class RestoreJobsSchema extends TableSchema 
implements ExecuteOnClusterLe
                              "  import_options," +
                              "  consistency_level," +
                              "  local_datacenter," +
+                             "  local_datacenter_only," +
                              "  expire_at" +
-                             ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", 
config);
+                             ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", 
config);
         }
 
         static String updateBlobSecrets(SchemaKeyspaceConfiguration config)
@@ -217,6 +219,7 @@ public class RestoreJobsSchema extends TableSchema 
implements ExecuteOnClusterLe
                              "import_options, " +
                              "consistency_level, " +
                              "local_datacenter, " +
+                             "local_datacenter_only, " +
                              "expire_at, " +
                              "slice_count " +
                              "FROM %s.%s " +
@@ -235,6 +238,7 @@ public class RestoreJobsSchema extends TableSchema 
implements ExecuteOnClusterLe
                              "import_options, " +
                              "consistency_level, " +
                              "local_datacenter, " +
+                             "local_datacenter_only, " +
                              "expire_at, " +
                              "slice_count " +
                              "FROM %s.%s " +
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java
index 15e8673d..83787dd5 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java
@@ -29,6 +29,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Sets;
 import org.slf4j.Logger;
@@ -43,6 +44,7 @@ import io.vertx.core.Vertx;
 import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
 import org.apache.cassandra.sidecar.cluster.locator.LocalTokenRangesProvider;
 import org.apache.cassandra.sidecar.common.data.RestoreJobStatus;
+import org.apache.cassandra.sidecar.common.response.NodeSettings;
 import org.apache.cassandra.sidecar.common.response.TokenRangeReplicasResponse;
 import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange;
 import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
@@ -56,6 +58,7 @@ import 
org.apache.cassandra.sidecar.db.RestoreRangeDatabaseAccessor;
 import org.apache.cassandra.sidecar.db.RestoreSlice;
 import org.apache.cassandra.sidecar.db.RestoreSliceDatabaseAccessor;
 import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
+import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException;
 import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException;
 import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
 import org.apache.cassandra.sidecar.metrics.server.RestoreMetrics;
@@ -84,6 +87,7 @@ public class RestoreJobDiscoverer implements PeriodicTask, 
RingTopologyChangeLis
     private final JobIdsByDay jobIdsByDay;
     private final RingTopologyRefresher ringTopologyRefresher;
     private final AtomicBoolean isExecuting = new AtomicBoolean(false);
+    private String localDatacenter = null;
     private int inflightJobsCount = 0;
     private int jobDiscoveryRecencyDays;
     private PeriodicTaskExecutor periodicTaskExecutor;
@@ -166,6 +170,7 @@ public class RestoreJobDiscoverer implements PeriodicTask, 
RingTopologyChangeLis
     @Override
     public void execute(Promise<Void> promise)
     {
+        initLocalDatacenterMaybe();
         tryExecuteDiscovery();
         promise.tryComplete();
     }
@@ -374,6 +379,15 @@ public class RestoreJobDiscoverer implements PeriodicTask, 
RingTopologyChangeLis
             return;
         }
 
+        // stop proceeding further if the local datacenter is excluded from 
the restore job
+        if (isLocalDatacenterExcluded(job))
+        {
+            LOGGER.info("Restore job is configured to skip running on the 
local datacenter. " +
+                        "jobId={} localDatacenter={} targetDatacenter={}",
+                        job.jobId, localDatacenter, job.localDatacenter);
+            return;
+        }
+
         // Only force refresh topology for the first time in each stage
         // RestoreJobDiscoverer is registered as a RingTopologyListener to 
receive future topology changed notifications, if any
         ringTopologyRefresher.register(job, this);
@@ -385,6 +399,41 @@ public class RestoreJobDiscoverer implements PeriodicTask, 
RingTopologyChangeLis
         }
     }
 
+    private void initLocalDatacenterMaybe()
+    {
+        if (localDatacenter != null)
+        {
+            return;
+        }
+
+        try
+        {
+            NodeSettings nodeSettings = 
instanceMetadataFetcher.callOnFirstAvailableInstance(i -> 
i.delegate().nodeSettings());
+            localDatacenter = nodeSettings.datacenter();
+        }
+        catch (CassandraUnavailableException cue)
+        {
+            LOGGER.debug("localDatacenter is not initialized", cue);
+        }
+    }
+
+    private boolean isLocalDatacenterExcluded(RestoreJob job)
+    {
+        if (!job.shouldRestoreToLocalDatacenterOnly)
+        {
+            return false;
+        }
+
+        if (localDatacenter == null)
+        {
+            LOGGER.debug("The restore job should restore only to the local 
datacenter, but the local datacenter is undetermined yet; skip this run");
+            return true;
+        }
+
+        // when job should restore to local datacenter only, but the target 
datacenter is not the local one
+        return !Objects.equals(localDatacenter, job.localDatacenter);
+    }
+
     private boolean shouldFindSlicesAndSubmit(RestoreJob job)
     {
         return (job.status == RestoreJobStatus.STAGE_READY || job.status == 
RestoreJobStatus.IMPORT_READY)
diff --git 
a/server/src/test/integration/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessorIntTest.java
 
b/server/src/test/integration/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessorIntTest.java
index eb4a7886..8f8fbe71 100644
--- 
a/server/src/test/integration/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessorIntTest.java
+++ 
b/server/src/test/integration/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessorIntTest.java
@@ -19,10 +19,12 @@
 package org.apache.cassandra.sidecar.db;
 
 import java.util.List;
+import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import com.datastax.driver.core.utils.UUIDs;
+import org.apache.cassandra.sidecar.common.data.ConsistencyLevel;
 import org.apache.cassandra.sidecar.common.data.RestoreJobSecrets;
 import org.apache.cassandra.sidecar.common.data.RestoreJobStatus;
 import 
org.apache.cassandra.sidecar.common.request.data.CreateRestoreJobRequestPayload;
@@ -80,16 +82,36 @@ class RestoreJobDatabaseAccessorIntTest extends 
IntegrationTestBase
         assertThat(foundJobs).hasSize(3);
         accessor.abort(jobId, null);
         assertJob(accessor.find(jobId), jobId, RestoreJobStatus.ABORTED, 
expiresAtMillis, secrets, null);
+
+        // create job that restoreToLocalDatacenterOnly
+        UUID restoreToLocalDcOnlyJobId = createJob(accessor, true);
+        foundJobs = accessor.findAllRecent(now, 3);
+        assertThat(foundJobs).hasSize(4);
+        Optional<RestoreJob> restoreToLocalDcOnlyJob = foundJobs.stream()
+                                                                .filter(j -> 
j.jobId.equals(restoreToLocalDcOnlyJobId))
+                                                                .findFirst();
+        assertThat(restoreToLocalDcOnlyJob).isNotEmpty();
+        assertJob(restoreToLocalDcOnlyJob.get(), restoreToLocalDcOnlyJobId, 
RestoreJobStatus.CREATED, expiresAtMillis, secrets);
+        
assertThat(restoreToLocalDcOnlyJob.get().shouldRestoreToLocalDatacenterOnly).isTrue();
     }
 
     private UUID createJob(RestoreJobDatabaseAccessor accessor)
+    {
+        return createJob(accessor, false);
+    }
+
+    private UUID createJob(RestoreJobDatabaseAccessor accessor, boolean 
restoreToLocalDatacenterOnly)
     {
         UUID jobId = UUIDs.timeBased();
-        CreateRestoreJobRequestPayload payload = 
CreateRestoreJobRequestPayload.builder(secrets, expiresAtMillis)
-                                                                               
.jobId(jobId)
-                                                                               
.jobAgent("agent")
-                                                                               
.build();
-        accessor.create(payload, qualifiedTableName);
+        CreateRestoreJobRequestPayload.Builder builder = 
CreateRestoreJobRequestPayload.builder(secrets, expiresAtMillis)
+                                                                               
        .jobId(jobId)
+                                                                               
        .jobAgent("agent");
+        if (restoreToLocalDatacenterOnly)
+        {
+            builder.consistencyLevel(ConsistencyLevel.LOCAL_QUORUM, "dc1");
+            builder.restoreToLocalDatacenterOnly(true);
+        }
+        accessor.create(builder.build(), qualifiedTableName);
 
         return jobId;
     }
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/db/SidecarSchemaTest.java 
b/server/src/test/java/org/apache/cassandra/sidecar/db/SidecarSchemaTest.java
index c57876c0..138ce81b 100644
--- 
a/server/src/test/java/org/apache/cassandra/sidecar/db/SidecarSchemaTest.java
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/db/SidecarSchemaTest.java
@@ -137,7 +137,7 @@ public class SidecarSchemaTest
             assertThat(interceptedExecStmts.get(0)).as("Create keyspace should 
be executed the first")
                                                    .contains("CREATE KEYSPACE 
IF NOT EXISTS sidecar_internal");
             assertThat(interceptedExecStmts).as("Create table should be 
executed for job table")
-                                            .anyMatch(stmt -> 
stmt.contains("CREATE TABLE IF NOT EXISTS sidecar_internal.restore_job_v4"));
+                                            .anyMatch(stmt -> 
stmt.contains("CREATE TABLE IF NOT EXISTS sidecar_internal.restore_job_v5"));
             assertThat(interceptedExecStmts).as("Create table should be 
executed for slice table")
                                             .anyMatch(stmt -> 
stmt.contains("CREATE TABLE IF NOT EXISTS sidecar_internal.restore_slice_v3"));
             assertThat(interceptedExecStmts).as("Create table should be 
executed for range table")
@@ -146,25 +146,27 @@ public class SidecarSchemaTest
                                             .anyMatch(stmt -> 
stmt.contains("CREATE TABLE IF NOT EXISTS 
sidecar_internal.role_permissions_v1"));
 
             List<String> expectedPrepStatements = Arrays.asList(
-            "INSERT INTO sidecar_internal.restore_job_v4 (  created_at,  
job_id,  keyspace_name,  table_name,  " +
-            "job_agent,  status,  blob_secrets,  import_options,  
consistency_level,  local_datacenter,  expire_at) " +
-            "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
+            "INSERT INTO sidecar_internal.restore_job_v5 (  created_at,  
job_id,  keyspace_name,  table_name,  " +
+            "job_agent,  status,  blob_secrets,  import_options,  
consistency_level,  local_datacenter,  local_datacenter_only,  expire_at) " +
+            "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
 
-            "INSERT INTO sidecar_internal.restore_job_v4 (  created_at,  
job_id,  blob_secrets) VALUES (?, ? ,?)",
+            "INSERT INTO sidecar_internal.restore_job_v5 (  created_at,  
job_id,  blob_secrets) VALUES (?, ? ,?)",
 
-            "INSERT INTO sidecar_internal.restore_job_v4 (  created_at,  
job_id,  status) VALUES (?, ?, ?)",
+            "INSERT INTO sidecar_internal.restore_job_v5 (  created_at,  
job_id,  status) VALUES (?, ?, ?)",
 
-            "INSERT INTO sidecar_internal.restore_job_v4 (  created_at,  
job_id,  job_agent) VALUES (?, ?, ?)",
+            "INSERT INTO sidecar_internal.restore_job_v5 (  created_at,  
job_id,  job_agent) VALUES (?, ?, ?)",
 
-            "INSERT INTO sidecar_internal.restore_job_v4 (  created_at,  
job_id,  expire_at) VALUES (?, ?, ?)",
+            "INSERT INTO sidecar_internal.restore_job_v5 (  created_at,  
job_id,  expire_at) VALUES (?, ?, ?)",
 
-            "INSERT INTO sidecar_internal.restore_job_v4 (  created_at,  
job_id,  slice_count) VALUES (?, ?, ?)",
+            "INSERT INTO sidecar_internal.restore_job_v5 (  created_at,  
job_id,  slice_count) VALUES (?, ?, ?)",
 
             "SELECT created_at, job_id, keyspace_name, table_name, job_agent, 
status, blob_secrets, import_options, " +
-            "consistency_level, local_datacenter, expire_at, slice_count FROM 
sidecar_internal.restore_job_v4 WHERE created_at = ? AND job_id = ?",
+            "consistency_level, local_datacenter, local_datacenter_only, 
expire_at, slice_count " +
+            "FROM sidecar_internal.restore_job_v5 WHERE created_at = ? AND 
job_id = ?",
 
             "SELECT created_at, job_id, keyspace_name, table_name, job_agent, 
status, blob_secrets, import_options, " +
-            "consistency_level, local_datacenter, expire_at, slice_count FROM 
sidecar_internal.restore_job_v4 WHERE created_at = ?",
+            "consistency_level, local_datacenter, local_datacenter_only, 
expire_at, slice_count " +
+            "FROM sidecar_internal.restore_job_v5 WHERE created_at = ?",
 
             "INSERT INTO sidecar_internal.restore_slice_v3 (  job_id,  
bucket_id,  slice_id,  bucket,  key,  " +
             "checksum,  start_token,  end_token,  compressed_size,  
uncompressed_size) " +
@@ -210,7 +212,7 @@ public class SidecarSchemaTest
                                             
.containsExactlyInAnyOrderElementsOf(expectedPrepStatements);
 
             assertThat(sidecarSchema.isInitialized()).as("Schema is 
successfully initialized").isTrue();
-            
assertTableSchema(sidecarSchema.tableSchema(RestoreJobsSchema.class), 
"sidecar_internal.restore_job_v4");
+            
assertTableSchema(sidecarSchema.tableSchema(RestoreJobsSchema.class), 
"sidecar_internal.restore_job_v5");
             
assertTableSchema(sidecarSchema.tableSchema(RestoreRangesSchema.class), 
"sidecar_internal.restore_range_v1");
             
assertTableSchema(sidecarSchema.tableSchema(RestoreSlicesSchema.class), 
"sidecar_internal.restore_slice_v3");
             
assertTableSchema(sidecarSchema.tableSchema(SidecarLeaseSchema.class), 
"sidecar_internal.sidecar_lease_v1");
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererTest.java
index d4c3c384..9eec31fc 100644
--- 
a/server/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererTest.java
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererTest.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.sidecar.TestModule;
 import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
 import org.apache.cassandra.sidecar.common.data.ConsistencyLevel;
 import org.apache.cassandra.sidecar.common.data.RestoreJobStatus;
+import org.apache.cassandra.sidecar.common.response.NodeSettings;
 import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange;
 import 
org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration;
 import 
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
@@ -50,6 +51,7 @@ import 
org.apache.cassandra.sidecar.db.RestoreRangeDatabaseAccessor;
 import org.apache.cassandra.sidecar.db.RestoreSliceDatabaseAccessor;
 import org.apache.cassandra.sidecar.db.RestoreSliceTest;
 import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
+import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException;
 import org.apache.cassandra.sidecar.exceptions.RestoreJobExceptions;
 import org.apache.cassandra.sidecar.metrics.MetricRegistryFactory;
 import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
@@ -58,10 +60,12 @@ import 
org.apache.cassandra.sidecar.tasks.PeriodicTaskExecutor;
 import org.apache.cassandra.sidecar.tasks.ScheduleDecision;
 import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
 import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
 
 import static 
org.apache.cassandra.sidecar.db.RestoreJobTest.createNewTestingJob;
 import static org.apache.cassandra.sidecar.db.RestoreJobTest.createTestingJob;
 import static org.apache.cassandra.sidecar.db.RestoreJobTest.createUpdatedJob;
+import static 
org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException.Service.JMX;
 import static org.apache.cassandra.sidecar.utils.TestMetricUtils.registry;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
@@ -90,6 +94,7 @@ class RestoreJobDiscovererTest
     private final SidecarSchema sidecarSchema = mock(SidecarSchema.class);
     private final RingTopologyRefresher ringTopologyRefresher = 
mock(RingTopologyRefresher.class);
     private final InstanceMetadataFetcher instanceMetadataFetcher = 
mock(InstanceMetadataFetcher.class);
+    private final NodeSettings mockNodeSettings = mock(NodeSettings.class);
     private SidecarMetrics metrics;
     private RestoreJobDiscoverer loop;
 
@@ -98,8 +103,9 @@ class RestoreJobDiscovererTest
     {
         MetricRegistryFactory mockRegistryFactory = 
mock(MetricRegistryFactory.class);
         when(mockRegistryFactory.getOrCreate()).thenReturn(registry());
-        InstanceMetadataFetcher mockMetadataFetcher = 
mock(InstanceMetadataFetcher.class);
-        metrics = new SidecarMetricsImpl(mockRegistryFactory, 
mockMetadataFetcher);
+        when(mockNodeSettings.datacenter()).thenReturn("dc1");
+        
when(instanceMetadataFetcher.callOnFirstAvailableInstance(any())).thenReturn(mockNodeSettings);
+        metrics = new SidecarMetricsImpl(mockRegistryFactory, 
instanceMetadataFetcher);
         loop = new RestoreJobDiscoverer(testConfig(),
                                         sidecarSchema,
                                         mockJobAccessor,
@@ -379,6 +385,59 @@ class RestoreJobDiscovererTest
         .isEqualTo(10);
     }
 
+    @Test
+    void testSkipNotOwnedRestoreToLocalDatacenterOnlyJob()
+    {
+        // Create a restore job that restores to dc2 only. Meanwhile, 
discoverer runs in dc1.
+        UUID jobId = UUIDs.timeBased();
+        when(mockJobAccessor.findAllRecent(anyLong(), anyInt()))
+        .thenReturn(Collections.singletonList(RestoreJob.builder()
+                                                        
.createdAt(RestoreJob.toLocalDate(jobId))
+                                                        .jobId(jobId)
+                                                        .jobAgent("agent")
+                                                        
.jobStatus(RestoreJobStatus.CREATED)
+                                                        .expireAt(new 
Date(System.currentTimeMillis() + 10000L))
+                                                        
.consistencyLevel(ConsistencyLevel.LOCAL_QUORUM)
+                                                        .localDatacenter("dc2")
+                                                        
.shouldRestoreToLocalDatacenterOnly(true)
+                                                        .build()));
+        executeBlocking();
+        verify(mockRangeAccessor, never()).create(any());
+    }
+
+    @Test
+    void 
testRestoreToLocalDatacenterOnlyJobIsOnHoldWhenLocalDatacenterIsUndetermined()
+    {
+        // local datacenter is undetermined and the restore job is configured 
to restore to local datacenter only.
+        // the job is on hold until local datacenter is resolved in discoverer.
+        
when(instanceMetadataFetcher.callOnFirstAvailableInstance(any())).thenThrow(new 
CassandraUnavailableException(JMX, "NodeSettings unavailable"));
+        UUID jobId = UUIDs.timeBased();
+        when(mockJobAccessor.findAllRecent(anyLong(), anyInt()))
+        .thenReturn(Collections.singletonList(RestoreJob.builder()
+                                                        
.createdAt(RestoreJob.toLocalDate(jobId))
+                                                        .jobId(jobId)
+                                                        .jobAgent("agent")
+                                                        
.jobStatus(RestoreJobStatus.CREATED)
+                                                        .expireAt(new 
Date(System.currentTimeMillis() + 10000L))
+                                                        
.consistencyLevel(ConsistencyLevel.LOCAL_QUORUM)
+                                                        .localDatacenter("dc1")
+                                                        
.shouldRestoreToLocalDatacenterOnly(true)
+                                                        .build()));
+
+        executeBlocking();
+        verify(ringTopologyRefresher, never()).register(any(), any());
+
+        // in the new run, the local datacenter is discovered. The discoverer 
proceeds further and register the job
+        Mockito.reset(instanceMetadataFetcher);
+        
when(instanceMetadataFetcher.callOnFirstAvailableInstance(any())).thenReturn(mockNodeSettings);
+        executeBlocking();
+        ArgumentCaptor<RestoreJob> restoreJobCaptor = 
ArgumentCaptor.forClass(RestoreJob.class);
+        verify(ringTopologyRefresher).register(restoreJobCaptor.capture(), 
any());
+        assertThat(restoreJobCaptor.getAllValues()).hasSize(1);
+        RestoreJob captured = restoreJobCaptor.getValue();
+        assertThat(captured.jobId).isEqualTo(jobId);
+    }
+
     private RestoreJobConfiguration testConfig()
     {
         RestoreJobConfiguration restoreJobConfiguration = 
mock(RestoreJobConfiguration.class);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org


Reply via email to