This is an automated email from the ASF dual-hosted git repository.

yifan-c pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git


The following commit(s) were added to refs/heads/trunk by this push:
     new eb2d55f9 CASSSIDECAR-422: SAI support in Sidecar (#333)
eb2d55f9 is described below

commit eb2d55f987ff90da552aacfb77b3c421d44f0340
Author: Shailaja Koppu <[email protected]>
AuthorDate: Fri May 15 19:38:49 2026 +0100

    CASSSIDECAR-422: SAI support in Sidecar (#333)
    
    Patch by Shailaja Koppu; Reviewed by Jyothsna konisa, Yifan Cai for 
CASSSIDECAR-422
---
 CHANGES.txt                                        |   1 +
 .../adapters/base/CassandraTableOperations.java    |  28 ++-
 .../adapters/cassandra50/Cassandra50Adapter.java   |  13 ++
 .../cassandra50/Cassandra50TableJmxOperations.java |  59 ++++++
 .../cassandra50/Cassandra50TableOperations.java}   |  44 ++---
 .../sidecar/common/data/SSTableImportOptions.java  |  35 +++-
 .../common/request/ImportSSTableRequest.java       |  34 ++++
 .../CreateRestoreJobRequestPayloadTest.java        |   8 +-
 .../sidecar/client/SidecarClientTest.java          |  45 +++++
 .../RestoreJobDiscovererSAIOptionsIntTest.java     | 207 ++++++++++++++++++++
 .../SSTableImportWithSaiIntegrationTest.java       | 215 +++++++++++++++++++++
 .../sidecar/common/server/TableOperations.java     |  32 +++
 server/build.gradle                                |   4 +
 .../CassandraInputValidationConfigurationImpl.java |   6 +-
 .../handlers/data/SSTableImportRequestParam.java   |  39 +++-
 .../sstableuploads/SSTableImportHandler.java       |   2 +
 .../sidecar/restore/RestoreRangeTask.java          |   2 +
 .../sidecar/utils/FastCassandraInputValidator.java |  11 +-
 .../cassandra/sidecar/utils/SSTableImporter.java   |  44 ++++-
 .../sstableuploads/SSTableImportHandlerTest.java   |  36 +++-
 .../sidecar/restore/RestoreRangeTaskTest.java      |  40 ++++
 .../sidecar/utils/SSTableImporterTest.java         |  15 +-
 .../sidecar/restore/RestoreJobTestUtils.java       |  17 +-
 23 files changed, 871 insertions(+), 66 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 0ac2ac99..e721b48e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 0.4.0
 -----
+ * SAI support in Sidecar (CASSSIDECAR-422)
  * Support column types not parseable by Java 3.x driver (CASSSIDECAR-443)
  * CdcManager.getInstanceId(instanceIp) returns -1 as it resolves ipAddress to 
null (CASSSIDECAR-417)
  * Fix circle CI pipelines OOM (CASSSIDECAR-423)
diff --git 
a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraTableOperations.java
 
b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraTableOperations.java
index a4787f38..9975e45a 100644
--- 
a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraTableOperations.java
+++ 
b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraTableOperations.java
@@ -32,7 +32,7 @@ import org.jetbrains.annotations.NotNull;
  */
 public class CassandraTableOperations implements TableOperations
 {
-    private final JmxClient jmxClient;
+    protected final JmxClient jmxClient;
 
     public CassandraTableOperations(JmxClient jmxClient)
     {
@@ -65,6 +65,30 @@ public class CassandraTableOperations implements 
TableOperations
                                            copyData);
     }
 
+    /**
+     * {@inheritDoc}
+     * <p>
+     * SAI index validation is only supported in Cassandra 5.0+; for older 
versions these parameters are ignored.
+     */
+    @Override
+    public List<String> importNewSSTables(@NotNull String keyspace,
+                                          @NotNull String tableName,
+                                          @NotNull String directory,
+                                          boolean resetLevel,
+                                          boolean clearRepaired,
+                                          boolean verifySSTables,
+                                          boolean verifyTokens,
+                                          boolean invalidateCaches,
+                                          boolean extendedVerify,
+                                          boolean copyData,
+                                          boolean failOnMissingIndex,
+                                          boolean validateIndexChecksum)
+    {
+        return importNewSSTables(keyspace, tableName, directory, resetLevel,
+                                 clearRepaired, verifySSTables, verifyTokens,
+                                 invalidateCaches, extendedVerify, copyData);
+    }
+
     /**
      * {@inheritDoc}
      */
@@ -75,7 +99,7 @@ public class CassandraTableOperations implements 
TableOperations
                         .getDataPaths();
     }
 
-    String tableMBeanName(String keyspace, String tableName)
+    protected String tableMBeanName(String keyspace, String tableName)
     {
         return 
String.format("org.apache.cassandra.db:type=%s,keyspace=%s,table=%s",
                              tableName.contains(".") ? "IndexTables" : 
"Tables",
diff --git 
a/adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/Cassandra50Adapter.java
 
b/adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/Cassandra50Adapter.java
index f7ac798b..49b141cd 100644
--- 
a/adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/Cassandra50Adapter.java
+++ 
b/adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/Cassandra50Adapter.java
@@ -26,6 +26,7 @@ import 
org.apache.cassandra.sidecar.common.server.CompactionManagerOperations;
 import org.apache.cassandra.sidecar.common.server.ICassandraAdapter;
 import org.apache.cassandra.sidecar.common.server.JmxClient;
 import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.common.server.TableOperations;
 import org.apache.cassandra.sidecar.common.server.dns.DnsResolver;
 import org.apache.cassandra.sidecar.common.server.utils.DriverUtils;
 import org.apache.cassandra.sidecar.db.schema.TableSchemaFetcher;
@@ -56,6 +57,18 @@ public class Cassandra50Adapter extends CassandraAdapter
         return new Cassandra50StorageOperations(jmxClient, dnsResolver);
     }
 
+    /**
+     * {@inheritDoc}
+     * <p>
+     * Returns Cassandra 5.0-specific TableOperations that supports SAI index 
validation
+     */
+    @Override
+    @NotNull
+    protected TableOperations createTableOperations(JmxClient jmxClient)
+    {
+        return new Cassandra50TableOperations(jmxClient);
+    }
+
     /**
      * {@inheritDoc}
      *
diff --git 
a/adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/Cassandra50TableJmxOperations.java
 
b/adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/Cassandra50TableJmxOperations.java
new file mode 100644
index 00000000..12b6214a
--- /dev/null
+++ 
b/adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/Cassandra50TableJmxOperations.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.cassandra50;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.cassandra.sidecar.adapters.base.jmx.TableJmxOperations;
+
+/**
+ * An interface that extends {@link TableJmxOperations} with the Cassandra 5.0 
MBean signature
+ * for {@code importNewSSTables}, which includes SAI index validation 
parameters.
+ */
+public interface Cassandra50TableJmxOperations extends TableJmxOperations
+{
+    /**
+     * Load new sstables from the given directory with SAI index validation 
support.
+     *
+     * @param srcPaths           the path to the new sstables - if it is an 
empty set, the data directories will
+     *                           be scanned
+     * @param resetLevel         if the level should be reset to 0 on the new 
sstables
+     * @param clearRepaired      if repaired info should be wiped from the new 
sstables
+     * @param verifySSTables     if the new sstables should be verified that 
they are not corrupt
+     * @param verifyTokens       if the tokens in the new sstables should be 
verified that they are owned by the
+     *                           current node
+     * @param invalidateCaches   if row cache should be invalidated for the 
keys in the new sstables
+     * @param extendedVerify     if we should run an extended verify checking 
all values in the new sstables
+     * @param copyData           if we should copy data from source paths 
instead of moving them
+     * @param failOnMissingIndex if SAI indexes should be validated during 
import
+     * @param validateIndexChecksum   if SAI index checksums should be 
verified during import
+     * @return list of failed import directories
+     */
+    List<String> importNewSSTables(Set<String> srcPaths,
+                                   boolean resetLevel,
+                                   boolean clearRepaired,
+                                   boolean verifySSTables,
+                                   boolean verifyTokens,
+                                   boolean invalidateCaches,
+                                   boolean extendedVerify,
+                                   boolean copyData,
+                                   boolean failOnMissingIndex,
+                                   boolean validateIndexChecksum);
+}
diff --git 
a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraTableOperations.java
 
b/adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/Cassandra50TableOperations.java
similarity index 61%
copy from 
adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraTableOperations.java
copy to 
adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/Cassandra50TableOperations.java
index a4787f38..01511665 100644
--- 
a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraTableOperations.java
+++ 
b/adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/Cassandra50TableOperations.java
@@ -16,27 +16,24 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.sidecar.adapters.base;
+package org.apache.cassandra.sidecar.adapters.cassandra50;
 
-import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 
-import org.apache.cassandra.sidecar.adapters.base.jmx.TableJmxOperations;
+import org.apache.cassandra.sidecar.adapters.base.CassandraTableOperations;
 import org.apache.cassandra.sidecar.common.server.JmxClient;
-import org.apache.cassandra.sidecar.common.server.TableOperations;
 import org.jetbrains.annotations.NotNull;
 
 /**
- * An implementation of the {@link TableOperations} that interfaces with 
Cassandra 4.0 and later
+ * A {@link CassandraTableOperations} extension for Cassandra 5.0 that 
supports SAI index validation
+ * parameters when importing SSTables.
  */
-public class CassandraTableOperations implements TableOperations
+public class Cassandra50TableOperations extends CassandraTableOperations
 {
-    private final JmxClient jmxClient;
-
-    public CassandraTableOperations(JmxClient jmxClient)
+    public Cassandra50TableOperations(JmxClient jmxClient)
     {
-        this.jmxClient = jmxClient;
+        super(jmxClient);
     }
 
     /**
@@ -52,9 +49,11 @@ public class CassandraTableOperations implements 
TableOperations
                                           boolean verifyTokens,
                                           boolean invalidateCaches,
                                           boolean extendedVerify,
-                                          boolean copyData)
+                                          boolean copyData,
+                                          boolean failOnMissingIndex,
+                                          boolean validateIndexChecksum)
     {
-        return jmxClient.proxy(TableJmxOperations.class, 
tableMBeanName(keyspace, tableName))
+        return jmxClient.proxy(Cassandra50TableJmxOperations.class, 
tableMBeanName(keyspace, tableName))
                         .importNewSSTables(Collections.singleton(directory),
                                            resetLevel,
                                            clearRepaired,
@@ -62,23 +61,8 @@ public class CassandraTableOperations implements 
TableOperations
                                            verifyTokens,
                                            invalidateCaches,
                                            extendedVerify,
-                                           copyData);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public List<String> getDataPaths(@NotNull String keyspace, @NotNull String 
table) throws IOException
-    {
-        return jmxClient.proxy(TableJmxOperations.class, 
tableMBeanName(keyspace, table))
-                        .getDataPaths();
-    }
-
-    String tableMBeanName(String keyspace, String tableName)
-    {
-        return 
String.format("org.apache.cassandra.db:type=%s,keyspace=%s,table=%s",
-                             tableName.contains(".") ? "IndexTables" : 
"Tables",
-                             keyspace, tableName);
+                                           copyData,
+                                           failOnMissingIndex,
+                                           validateIndexChecksum);
     }
 }
diff --git 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/data/SSTableImportOptions.java
 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/data/SSTableImportOptions.java
index 60b6a492..f9c62c44 100644
--- 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/data/SSTableImportOptions.java
+++ 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/data/SSTableImportOptions.java
@@ -18,14 +18,19 @@
 
 package org.apache.cassandra.sidecar.common.data;
 
-import java.util.HashMap;
+import java.util.LinkedHashMap;
 
 /**
  * Options for Cassandra import nodetool command. It is like properties.
  * Supports Json serialization and deserialization.
+ * <p>
+ * This class is used exclusively in the S3 restore job path ({@code 
CreateRestoreJobRequestPayload}).
  */
-public class SSTableImportOptions extends HashMap<String, String>
+public class SSTableImportOptions extends LinkedHashMap<String, String>
 {
+    public static final boolean DEFAULT_FAIL_ON_MISSING_INDEX = false;
+    public static final boolean DEFAULT_VALIDATE_INDEX_CHECKSUM = false;
+
     private static final String RESET_LEVEL = "resetLevel";
     private static final String CLEAR_REPAIRED = "clearRepaired";
     private static final String VERIFY_SSTABLES = "verifySSTables";
@@ -33,6 +38,8 @@ public class SSTableImportOptions extends HashMap<String, 
String>
     private static final String INVALIDATE_CACHES = "invalidateCaches";
     private static final String EXTENDED_VERIFY = "extendedVerify";
     private static final String COPY_DATA = "copyData";
+    private static final String FAIL_ON_MISSING_INDEX = "failOnMissingIndex";
+    private static final String VALIDATE_INDEX_CHECKSUM = 
"validateIndexChecksum";
 
     public static SSTableImportOptions defaults()
     {
@@ -48,6 +55,8 @@ public class SSTableImportOptions extends HashMap<String, 
String>
         put(INVALIDATE_CACHES, Boolean.toString(true));
         put(EXTENDED_VERIFY, Boolean.toString(true));
         put(COPY_DATA, Boolean.toString(false)); // note: the default is false
+        put(FAIL_ON_MISSING_INDEX, 
Boolean.toString(DEFAULT_FAIL_ON_MISSING_INDEX));
+        put(VALIDATE_INDEX_CHECKSUM, 
Boolean.toString(DEFAULT_VALIDATE_INDEX_CHECKSUM));
     }
 
     public SSTableImportOptions resetLevel(boolean enabled)
@@ -126,4 +135,26 @@ public class SSTableImportOptions extends HashMap<String, 
String>
     {
         return Boolean.parseBoolean(get(COPY_DATA));
     }
+
+    public SSTableImportOptions failOnMissingIndex(boolean enabled)
+    {
+        put(FAIL_ON_MISSING_INDEX, Boolean.toString(enabled));
+        return this;
+    }
+
+    public boolean failOnMissingIndex()
+    {
+        return Boolean.parseBoolean(get(FAIL_ON_MISSING_INDEX));
+    }
+
+    public SSTableImportOptions validateIndexChecksum(boolean enabled)
+    {
+        put(VALIDATE_INDEX_CHECKSUM, Boolean.toString(enabled));
+        return this;
+    }
+
+    public boolean validateIndexChecksum()
+    {
+        return Boolean.parseBoolean(get(VALIDATE_INDEX_CHECKSUM));
+    }
 }
diff --git 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/ImportSSTableRequest.java
 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/ImportSSTableRequest.java
index 7633b944..19919db3 100644
--- 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/ImportSSTableRequest.java
+++ 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/ImportSSTableRequest.java
@@ -65,6 +65,8 @@ public class ImportSSTableRequest extends 
JsonRequest<SSTableImportResponse>
         private Boolean invalidateCaches;
         private Boolean extendedVerify;
         private Boolean copyData;
+        private Boolean failOnMissingIndex;
+        private Boolean validateIndexChecksum;
 
         public ImportOptions()
         {
@@ -153,6 +155,30 @@ public class ImportSSTableRequest extends 
JsonRequest<SSTableImportResponse>
             this.copyData = copyData;
             return this;
         }
+
+        /**
+         * Sets the {@code failOnMissingIndex} and returns a reference to this 
ImportOptions enabling method chaining.
+         *
+         * @param failOnMissingIndex the {@code failOnMissingIndex} to set
+         * @return a reference to this ImportOptions
+         */
+        public ImportOptions failOnMissingIndex(boolean failOnMissingIndex)
+        {
+            this.failOnMissingIndex = failOnMissingIndex;
+            return this;
+        }
+
+        /**
+         * Sets the {@code validateIndexChecksum} and returns a reference to 
this ImportOptions enabling method chaining.
+         *
+         * @param validateIndexChecksum the {@code validateIndexChecksum} to 
set
+         * @return a reference to this ImportOptions
+         */
+        public ImportOptions validateIndexChecksum(boolean 
validateIndexChecksum)
+        {
+            this.validateIndexChecksum = validateIndexChecksum;
+            return this;
+        }
     }
 
     static String requestURI(String keyspace, String tableName, String 
uploadId, ImportOptions importOptions)
@@ -205,6 +231,14 @@ public class ImportSSTableRequest extends 
JsonRequest<SSTableImportResponse>
         {
             options.add("copyData=" + importOptions.copyData);
         }
+        if (importOptions.failOnMissingIndex != null)
+        {
+            options.add("failOnMissingIndex=" + 
importOptions.failOnMissingIndex);
+        }
+        if (importOptions.validateIndexChecksum != null)
+        {
+            options.add("validateIndexChecksum=" + 
importOptions.validateIndexChecksum);
+        }
 
         return options;
     }
diff --git 
a/client-common/src/test/java/org/apache/cassandra/sidecar/common/request/CreateRestoreJobRequestPayloadTest.java
 
b/client-common/src/test/java/org/apache/cassandra/sidecar/common/request/CreateRestoreJobRequestPayloadTest.java
index e1f831b9..55628270 100644
--- 
a/client-common/src/test/java/org/apache/cassandra/sidecar/common/request/CreateRestoreJobRequestPayloadTest.java
+++ 
b/client-common/src/test/java/org/apache/cassandra/sidecar/common/request/CreateRestoreJobRequestPayloadTest.java
@@ -65,13 +65,15 @@ class CreateRestoreJobRequestPayloadTest
                                    "\"jobAgent\":\"agent\"," +
                                    "\"secrets\":" + 
MAPPER.writeValueAsString(secrets) + "," +
                                    "\"importOptions\":{" +
-                                   "\"verifyTokens\":\"true\"," +
                                    "\"resetLevel\":\"true\"," +
                                    "\"clearRepaired\":\"true\"," +
-                                   "\"extendedVerify\":\"true\"," +
                                    "\"verifySSTables\":\"true\"," +
+                                   "\"verifyTokens\":\"true\"," +
                                    "\"invalidateCaches\":\"true\"," +
-                                   "\"copyData\":\"false\"}," +
+                                   "\"extendedVerify\":\"true\"," +
+                                   "\"copyData\":\"false\"," +
+                                   "\"failOnMissingIndex\":\"false\"," +
+                                   "\"validateIndexChecksum\":\"false\"}," +
                                    "\"expireAt\":" + expireAt + "," +
                                    "\"consistencyLevel\":\"LOCAL_QUORUM\"," +
                                    "\"localDatacenter\":\"DC1\"}");
diff --git 
a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
 
b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
index 336f636f..3cb732d7 100644
--- 
a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
+++ 
b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
@@ -848,6 +848,51 @@ abstract class SidecarClientTest
         assertThat(request.getMethod()).isEqualTo("PUT");
     }
 
+    @Test
+    void testSSTableImportWithSaiOptions() throws Exception
+    {
+        String responseAsString = 
"{\"success\":true,\"uploadId\":\"0000-0000\",\"keyspace\":\"cycling\"," +
+                                  "\"tableName\":\"cyclist_name\"}";
+        MockResponse response = new 
MockResponse().setResponseCode(OK.code()).setBody(responseAsString);
+        SidecarInstanceImpl sidecarInstance = instances.get(0);
+        MockWebServer mockWebServer = servers.get(0);
+        mockWebServer.enqueue(response);
+
+        ImportSSTableRequest.ImportOptions options = new 
ImportSSTableRequest.ImportOptions()
+                                                     .resetLevel(true)
+                                                     .clearRepaired(true)
+                                                     .verifySSTables(true)
+                                                     .verifyTokens(true)
+                                                     .invalidateCaches(true)
+                                                     .extendedVerify(true)
+                                                     .copyData(true)
+                                                     .failOnMissingIndex(true)
+                                                     
.validateIndexChecksum(true);
+        SSTableImportResponse result = 
client.importSSTableRequest(sidecarInstance,
+                                                                   "cycling",
+                                                                   
"cyclist_name",
+                                                                   "0000-0000",
+                                                                   options)
+                                             .get(30, TimeUnit.SECONDS);
+
+        assertThat(result).isNotNull();
+        assertThat(result.keyspace()).isEqualTo("cycling");
+        assertThat(result.tableName()).isEqualTo("cyclist_name");
+        assertThat(result.success()).isTrue();
+        assertThat(result.uploadId()).isEqualTo("0000-0000");
+
+        assertThat(mockWebServer.getRequestCount()).isEqualTo(1);
+        RecordedRequest request = mockWebServer.takeRequest();
+        
assertThat(request.getPath()).isEqualTo(ApiEndpointsV1.SSTABLE_IMPORT_ROUTE
+                                                
.replaceAll(KEYSPACE_PATH_PARAM, "cycling")
+                                                
.replaceAll(ApiEndpointsV1.TABLE_PATH_PARAM, "cyclist_name")
+                                                
.replaceAll(ApiEndpointsV1.UPLOAD_ID_PATH_PARAM, "0000-0000")
+                                                + 
"?resetLevel=true&clearRepaired=true&verifySSTables=true&" +
+                                                
"verifyTokens=true&invalidateCaches=true&extendedVerify=true&" +
+                                                
"copyData=true&failOnMissingIndex=true&validateIndexChecksum=true");
+        assertThat(request.getMethod()).isEqualTo("PUT");
+    }
+
     @Test
     void testUploadSSTableFailsWhenFileDoesNotExist()
     {
diff --git 
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/restore/jobdiscoverer/RestoreJobDiscovererSAIOptionsIntTest.java
 
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/restore/jobdiscoverer/RestoreJobDiscovererSAIOptionsIntTest.java
new file mode 100644
index 00000000..d2dcf31e
--- /dev/null
+++ 
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/restore/jobdiscoverer/RestoreJobDiscovererSAIOptionsIntTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.restore.jobdiscoverer;
+
+import java.math.BigInteger;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+import org.junit.jupiter.api.Test;
+
+import com.google.inject.AbstractModule;
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.sidecar.common.data.RestoreJobStatus;
+import org.apache.cassandra.sidecar.common.data.SSTableImportOptions;
+import 
org.apache.cassandra.sidecar.common.request.data.CreateRestoreJobRequestPayload;
+import 
org.apache.cassandra.sidecar.common.request.data.CreateSliceRequestPayload;
+import 
org.apache.cassandra.sidecar.common.request.data.UpdateRestoreJobRequestPayload;
+import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName;
+import org.apache.cassandra.sidecar.db.RestoreJob;
+import org.apache.cassandra.sidecar.db.RestoreJobDatabaseAccessor;
+import org.apache.cassandra.sidecar.db.RestoreRange;
+import org.apache.cassandra.sidecar.db.RestoreRangeDatabaseAccessor;
+import org.apache.cassandra.sidecar.restore.RestoreJobDiscoverer;
+import org.apache.cassandra.sidecar.restore.RestoreJobTestUtils;
+import org.apache.cassandra.sidecar.testing.QualifiedName;
+import 
org.apache.cassandra.sidecar.testing.SharedClusterSidecarIntegrationTestBase;
+import org.apache.cassandra.sidecar.utils.SimpleCassandraVersion;
+import org.apache.cassandra.testing.ClusterBuilderConfiguration;
+
+import static 
org.apache.cassandra.sidecar.restore.RestoreJobTestUtils.createJob;
+import static 
org.apache.cassandra.sidecar.restore.RestoreJobTestUtils.disableRestoreProcessor;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
+
+/**
+ * Integration tests verifying that SAI import options (failOnMissingIndex, 
validateIndexChecksum) are
+ * correctly persisted and propagated through the restore job pipeline: create 
job -> persist -> discover -> create ranges.
+ */
+class RestoreJobDiscovererSAIOptionsIntTest extends 
SharedClusterSidecarIntegrationTestBase
+{
+    private static final SimpleCassandraVersion MIN_VERSION_WITH_SAI = 
SimpleCassandraVersion.create("5.0.0");
+    private static final String TEST_KEYSPACE = "sai_import_options_ks";
+    private static final QualifiedName TABLE_NAME = new 
QualifiedName(TEST_KEYSPACE, "test_table");
+
+    @Override
+    protected void beforeClusterProvisioning()
+    {
+        SimpleCassandraVersion version = 
SimpleCassandraVersion.create(testVersion.version());
+        assumeThat(version)
+        .as("SAI indexes are only available in Cassandra 5.0 and later")
+        .isGreaterThanOrEqualTo(MIN_VERSION_WITH_SAI);
+    }
+
+    @Override
+    protected ClusterBuilderConfiguration testClusterConfiguration()
+    {
+        return super.testClusterConfiguration()
+                    .nodesPerDc(3)
+                    .tokenSupplier(nodeIndex ->
+                        Collections.singletonList(String.valueOf(new long[]{ 
0, 1000L, 2000L }[nodeIndex - 1])));
+    }
+
+    @Override
+    protected void startSidecar(ICluster<? extends IInstance> cluster) throws 
InterruptedException
+    {
+        serverWrapper = startSidecarWithInstances(List.of(cluster.get(1)), 
(AbstractModule) disableRestoreProcessor());
+    }
+
+    @Override
+    protected void initializeSchemaForTest()
+    {
+        createTestKeyspace(TEST_KEYSPACE, Map.of("datacenter1", 2));
+        createTestTable(TABLE_NAME, "CREATE TABLE %s (id text PRIMARY KEY, 
name text)");
+    }
+
+    @Override
+    protected void beforeTestStart()
+    {
+        waitForSchemaReady(30, TimeUnit.SECONDS);
+    }
+
+    @Test
+    void testDefaultImportOptions()
+    {
+        QualifiedTableName tableName = new 
QualifiedTableName(TABLE_NAME.keyspace(), TABLE_NAME.table());
+        RestoreJobTestUtils.RestoreJobClient testClient =
+                RestoreJobTestUtils.client(trustedClient(), "localhost", 
serverWrapper.serverPort);
+
+        // create job with default options
+        UUID jobId = createJob(testClient, tableName);
+
+        RestoreJobDatabaseAccessor jobAccessor = 
serverWrapper.injector.getInstance(RestoreJobDatabaseAccessor.class);
+        RestoreJob job = jobAccessor.find(jobId);
+        SSTableImportOptions importOptions = job.importOptions;
+        // verify default SAI options on persisted job
+        assertThat(importOptions.failOnMissingIndex()).isFalse();
+        assertThat(importOptions.validateIndexChecksum()).isFalse();
+
+        short bucketId = 0;
+        CreateSliceRequestPayload slicePayload = new 
CreateSliceRequestPayload("sliceId", bucketId, "bucket", "key",
+                "checksum", BigInteger.valueOf(1L), BigInteger.valueOf(1600L),
+                100L, 100L);
+        testClient.createRestoreSlice(tableName, jobId, slicePayload);
+        testClient.updateRestoreJob(tableName, jobId,
+                
UpdateRestoreJobRequestPayload.builder().withStatus(RestoreJobStatus.STAGE_READY).build());
+
+        RestoreJobDiscoverer restoreJobDiscoverer = 
serverWrapper.injector.getInstance(RestoreJobDiscoverer.class);
+        restoreJobDiscoverer.tryExecuteDiscovery();
+
+        // verify ranges were created and link to the correct job
+        RestoreRangeDatabaseAccessor rangeDatabaseAccessor = 
serverWrapper.injector.getInstance(RestoreRangeDatabaseAccessor.class);
+        List<RestoreRange> ranges = rangeDatabaseAccessor.findAll(jobId, 
bucketId);
+        assertThat(ranges).isNotEmpty();
+        for (RestoreRange range : ranges)
+        {
+            assertThat(range.jobId()).isEqualTo(jobId);
+        }
+
+        // Re-read the job after discovery to confirm default importOptions 
are preserved
+        RestoreJob jobAfterDiscovery = jobAccessor.find(jobId);
+        assertThat(jobAfterDiscovery.importOptions.failOnMissingIndex())
+                .describedAs("failOnMissingIndex should be false by default 
after discovery")
+                .isFalse();
+        assertThat(jobAfterDiscovery.importOptions.validateIndexChecksum())
+                .describedAs("validateIndexChecksum should be false by default 
after discovery")
+                .isFalse();
+    }
+
+    @Test
+    void testSaiImportOptionsPropagatedToRestoreRanges()
+    {
+        QualifiedTableName tableName = new 
QualifiedTableName(TABLE_NAME.keyspace(), TABLE_NAME.table());
+        RestoreJobTestUtils.RestoreJobClient testClient =
+        RestoreJobTestUtils.client(trustedClient(), "localhost", 
serverWrapper.serverPort);
+
+        // create job with SAI options enabled
+        Consumer<CreateRestoreJobRequestPayload.Builder> enableSaiOptions = 
builder ->
+            builder.updateImportOptions(opts -> 
opts.failOnMissingIndex(true).validateIndexChecksum(true));
+        UUID jobId = createJob(testClient, tableName, enableSaiOptions);
+
+        // verify SAI options are persisted on the job
+        RestoreJobDatabaseAccessor jobAccessor = 
serverWrapper.injector.getInstance(RestoreJobDatabaseAccessor.class);
+        RestoreJob job = jobAccessor.find(jobId);
+        SSTableImportOptions importOptions = job.importOptions;
+        assertThat(importOptions.failOnMissingIndex()).isTrue();
+        assertThat(importOptions.validateIndexChecksum()).isTrue();
+        // verify other defaults are preserved
+        assertThat(importOptions.resetLevel()).isTrue();
+        assertThat(importOptions.clearRepaired()).isTrue();
+        assertThat(importOptions.verifySSTables()).isTrue();
+        assertThat(importOptions.verifyTokens()).isTrue();
+        assertThat(importOptions.invalidateCaches()).isTrue();
+        assertThat(importOptions.extendedVerify()).isTrue();
+        assertThat(importOptions.copyData()).isFalse();
+
+        // create slice and discover ranges
+        short bucketId = 0;
+        CreateSliceRequestPayload slicePayload = new 
CreateSliceRequestPayload("sliceId", bucketId, "bucket", "key",
+                                                                               
"checksum", BigInteger.valueOf(1L), BigInteger.valueOf(1600L),
+                                                                               
100L, 100L);
+        testClient.createRestoreSlice(tableName, jobId, slicePayload);
+        testClient.updateRestoreJob(tableName, jobId,
+                                    
UpdateRestoreJobRequestPayload.builder().withStatus(RestoreJobStatus.STAGE_READY).build());
+
+        RestoreJobDiscoverer restoreJobDiscoverer = 
serverWrapper.injector.getInstance(RestoreJobDiscoverer.class);
+        restoreJobDiscoverer.tryExecuteDiscovery();
+
+        // verify ranges were created by the discoverer and link back to the 
correct job
+        RestoreRangeDatabaseAccessor rangeDatabaseAccessor = 
serverWrapper.injector.getInstance(RestoreRangeDatabaseAccessor.class);
+        List<RestoreRange> ranges = rangeDatabaseAccessor.findAll(jobId, 
bucketId);
+        assertThat(ranges).isNotEmpty();
+        for (RestoreRange range : ranges)
+        {
+            assertThat(range.jobId()).isEqualTo(jobId);
+        }
+
+        // Re-read the job after discovery to confirm importOptions are still 
intact.
+        RestoreJob jobAfterDiscovery = jobAccessor.find(jobId);
+        assertThat(jobAfterDiscovery.importOptions.failOnMissingIndex())
+        .describedAs("failOnMissingIndex should be true after discovery")
+        .isTrue();
+        assertThat(jobAfterDiscovery.importOptions.validateIndexChecksum())
+        .describedAs("validateIndexChecksum should be true after discovery")
+        .isTrue();
+    }
+}
diff --git 
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportWithSaiIntegrationTest.java
 
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportWithSaiIntegrationTest.java
new file mode 100644
index 00000000..fc485db1
--- /dev/null
+++ 
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportWithSaiIntegrationTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes.sstableuploads;
+
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.junit.jupiter.api.Test;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.ext.web.client.HttpResponse;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.sidecar.cluster.InstancesMetadata;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.testing.QualifiedName;
+import 
org.apache.cassandra.sidecar.testing.SharedClusterSidecarIntegrationTestBase;
+import org.apache.cassandra.sidecar.utils.SimpleCassandraVersion;
+import org.apache.cassandra.testing.TestUtils;
+
+import static org.apache.cassandra.testing.TestUtils.DC1_RF1;
+import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE;
+import static org.apache.cassandra.testing.TestUtils.TEST_TABLE_PREFIX;
+import static org.apache.cassandra.testing.utils.AssertionUtils.getBlocking;
+import static org.apache.cassandra.testing.utils.AssertionUtils.loopAssert;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
+
+/**
+ * Integration tests for SSTable import with SAI (Storage Attached Index) 
parameters.
+ * Validates that SSTables with SAI index metadata can be uploaded and 
imported via the Sidecar REST API,
+ * including proper handling of SAI index files and query parameters ({@code 
failOnMissingIndex},
+ * {@code validateIndexChecksum}).
+ */
+class SSTableImportWithSaiIntegrationTest extends 
SharedClusterSidecarIntegrationTestBase
+{
+    private static final SimpleCassandraVersion MIN_VERSION_WITH_SAI = 
SimpleCassandraVersion.create("5.0.0");
+    private static final String WITH_COMPACTION_DISABLED = " WITH COMPACTION = 
{\n" +
+                                                           "   'class': 
'SizeTieredCompactionStrategy', \n" +
+                                                           "   'enabled': 
'false' }";
+
+    static final QualifiedName SAI_TABLE = 
TestUtils.uniqueTestTableFullName(TEST_KEYSPACE, TEST_TABLE_PREFIX);
+
+    @Override
+    protected void beforeClusterProvisioning()
+    {
+        SimpleCassandraVersion version = 
SimpleCassandraVersion.create(testVersion.version());
+        assumeThat(version)
+        .as("SAI indexes are only available in Cassandra 5.0 and later")
+        .isGreaterThanOrEqualTo(MIN_VERSION_WITH_SAI);
+    }
+
+    @Override
+    protected void initializeSchemaForTest()
+    {
+        createTestKeyspace(TEST_KEYSPACE, DC1_RF1);
+        createTestTable(SAI_TABLE,
+                        "CREATE TABLE IF NOT EXISTS %s (id text, value text, 
PRIMARY KEY(id))"
+                        + WITH_COMPACTION_DISABLED + ";");
+
+        cluster.schemaChangeIgnoringStoppedInstances(
+        String.format("CREATE CUSTOM INDEX IF NOT EXISTS %s_sai_idx ON %s 
(value) " +
+                       "USING 
'org.apache.cassandra.index.sai.StorageAttachedIndex';",
+                       SAI_TABLE.table(), SAI_TABLE));
+
+        cluster.schemaChangeIgnoringStoppedInstances(
+        String.format("INSERT INTO %s (id, value) VALUES ('a', 'val_a');", 
SAI_TABLE));
+        cluster.schemaChangeIgnoringStoppedInstances(
+        String.format("INSERT INTO %s (id, value) VALUES ('b', 'val_b');", 
SAI_TABLE));
+    }
+
+    @Test
+    void testSSTableImportWithSaiIndexParams() throws Exception
+    {
+        String snapshotName = SAI_TABLE.table() + "-snapshot";
+
+        // Take snapshot
+        cluster.get(1).nodetoolResult("snapshot",
+                                       "--tag", snapshotName,
+                                       "--table", SAI_TABLE.table(),
+                                       "--", SAI_TABLE.keyspace())
+               .asserts().success();
+
+        // Find snapshot files on the filesystem
+        List<Path> snapshotFiles = findChildFile("127.0.0.1", 
SAI_TABLE.keyspace(), snapshotName);
+        assertThat(snapshotFiles).isNotEmpty();
+
+        List<Path> filesToUpload = snapshotFiles.stream()
+                                                .filter(p -> 
p.toFile().isFile())
+                                                .collect(Collectors.toList());
+        assertThat(filesToUpload).isNotEmpty();
+
+        // SAI index files have '+' in their filename
+        assertThat(filesToUpload.stream().anyMatch(p -> 
p.getFileName().toString().contains("+")))
+        .withFailMessage("Expected at least one snapshot file with '+' in its 
name (SAI index file)")
+        .isTrue();
+
+        // Upload snapshot files via the REST upload endpoint
+        UUID uploadId = UUID.randomUUID();
+        for (Path path : filesToUpload)
+        {
+            String fileName = path.getFileName().toString();
+            // URLEncoder.encode encodes space as '+', which is correct for 
query params but not path segments.
+            // For path segments, '+' is a literal character, so we must 
encode it as %2B.
+            String encodedFileName = URLEncoder.encode(fileName, 
StandardCharsets.UTF_8)
+                                               .replace("+", "%2B");
+
+            String uploadRoute = "/api/v1/uploads/" + uploadId + "/keyspaces/" 
+ SAI_TABLE.keyspace()
+                                 + "/tables/" + SAI_TABLE.table() + 
"/components/" + encodedFileName;
+            Buffer fileContent = Buffer.buffer(Files.readAllBytes(path));
+            HttpResponse<Buffer> uploadResponse = getBlocking(
+            trustedClient().put(serverWrapper.serverPort, "127.0.0.1", 
uploadRoute)
+                           .sendBuffer(fileContent));
+            assertThat(uploadResponse.statusCode())
+            .withFailMessage("Upload failed for " + fileName + " with status " 
+ uploadResponse.statusCode()
+                             + ": " + uploadResponse.bodyAsString())
+            .isEqualTo(HttpResponseStatus.OK.code());
+        }
+
+        // Truncate table and wait until empty
+        cluster.schemaChangeIgnoringStoppedInstances(String.format("TRUNCATE 
TABLE %s", SAI_TABLE));
+        loopAssert(30, () -> {
+            Object[][] rows = cluster.getFirstRunningInstance()
+                                     .coordinator()
+                                     .execute(String.format("SELECT * FROM 
%s", SAI_TABLE),
+                                              ConsistencyLevel.LOCAL_QUORUM);
+            assertThat(rows == null || rows.length == 0)
+            .withFailMessage("Table should be empty after truncation")
+            .isTrue();
+        });
+
+        // Import with SAI index query params, poll until completed
+        String importRoute = "/api/v1/uploads/" + uploadId + "/keyspaces/" + 
SAI_TABLE.keyspace()
+                             + "/tables/" + SAI_TABLE.table() + "/import";
+        loopAssert(300, 1000, () -> {
+            HttpResponse<Buffer> importResponse = getBlocking(
+            trustedClient().put(serverWrapper.serverPort, "127.0.0.1", 
importRoute)
+                           .addQueryParam("failOnMissingIndex", "true")
+                           .addQueryParam("validateIndexChecksum", "true")
+                           .send());
+            
assertThat(importResponse.statusCode()).isEqualTo(HttpResponseStatus.OK.code());
+        });
+
+        // Verify SAI index component files exist on the filesystem.
+        // No new data was written after truncate, so any SAI files must have 
come from the imported snapshot.
+        InstanceMetadata instance = 
serverWrapper.injector.getInstance(InstancesMetadata.class)
+                                                          
.instanceFromHost("127.0.0.1");
+        String dataDir = instance.dataDirs().get(0);
+        Path keyspacePath = Paths.get(dataDir, SAI_TABLE.keyspace());
+        Path tableDir;
+        try (Stream<Path> dirs = Files.list(keyspacePath))
+        {
+            tableDir = dirs.filter(dir -> 
dir.getFileName().toString().startsWith(SAI_TABLE.table()))
+                           .findFirst()
+                           .orElseThrow(() -> new AssertionError("Table 
directory not found"));
+        }
+
+        List<Path> saiFiles;
+        try (Stream<Path> files = Files.list(tableDir))
+        {
+            saiFiles = files.filter(f -> 
f.getFileName().toString().toUpperCase().contains("SAI"))
+                            .collect(Collectors.toList());
+        }
+        assertThat(saiFiles)
+        .withFailMessage("Expected SAI index component files on the filesystem 
after import")
+        .isNotEmpty();
+
+        // Verify imported data is present
+        assertThat(queryIds()).containsExactlyInAnyOrder("a", "b");
+
+        // Add new data after import
+        cluster.schemaChangeIgnoringStoppedInstances(
+        String.format("INSERT INTO %s (id, value) VALUES ('c', 'val_c');", 
SAI_TABLE));
+        cluster.schemaChangeIgnoringStoppedInstances(
+        String.format("INSERT INTO %s (id, value) VALUES ('d', 'val_d');", 
SAI_TABLE));
+
+        // Verify all data (imported + new) is present
+        assertThat(queryIds()).containsExactlyInAnyOrder("a", "b", "c", "d");
+    }
+
+    private List<String> queryIds()
+    {
+        Object[][] rows = cluster.getFirstRunningInstance()
+                                  .coordinator()
+                                  .execute(String.format("SELECT id FROM %s", 
SAI_TABLE),
+                                           ConsistencyLevel.LOCAL_QUORUM);
+        return Stream.of(rows)
+                     .map(row -> (String) row[0])
+                     .collect(Collectors.toList());
+    }
+}
diff --git 
a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/TableOperations.java
 
b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/TableOperations.java
index 3d1570ae..03be9ac5 100644
--- 
a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/TableOperations.java
+++ 
b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/TableOperations.java
@@ -55,6 +55,38 @@ public interface TableOperations
                                    boolean extendedVerify,
                                    boolean copyData);
 
+    /**
+     * Load new SSTables from the given {@code directory} with SAI index 
validation options.
+     * Defaults to ignoring SAI flags for Cassandra versions that do not 
support them.
+     *
+     * @param keyspace           the keyspace in Cassandra
+     * @param tableName          the table name in Cassandra
+     * @param directory          the directory to the new SSTables
+     * @param resetLevel         if the level should be reset to 0 on the new 
SSTables
+     * @param clearRepaired      if repaired info should be wiped from the new 
SSTables
+     * @param verifySSTables     if the new SSTables should be verified that 
they are not corrupt
+     * @param verifyTokens       if the tokens in the new SSTables should be 
verified that they are owned by the
+     *                           current node
+     * @param invalidateCaches   if row cache should be invalidated for the 
keys in the new SSTables
+     * @param extendedVerify     if we should run an extended verify checking 
all values in the new SSTables
+     * @param copyData           if we should copy data from source paths 
instead of moving them
+     * @param failOnMissingIndex if SAI indexes should be validated during 
import
+     * @param validateIndexChecksum   if SAI index checksums should be 
verified during import
+     * @return list of failed import directories
+     */
+    List<String> importNewSSTables(@NotNull String keyspace,
+                                   @NotNull String tableName,
+                                   @NotNull String directory,
+                                   boolean resetLevel,
+                                   boolean clearRepaired,
+                                   boolean verifySSTables,
+                                   boolean verifyTokens,
+                                   boolean invalidateCaches,
+                                   boolean extendedVerify,
+                                   boolean copyData,
+                                   boolean failOnMissingIndex,
+                                   boolean validateIndexChecksum);
+
     /**
      * Returns a list of data directories for the given {@code table}.
      *
diff --git a/server/build.gradle b/server/build.gradle
index 9cf929a2..8b2ae702 100644
--- a/server/build.gradle
+++ b/server/build.gradle
@@ -186,6 +186,8 @@ dependencies {
 
     testFixturesApi(testFixtures(project(":server-common")))
     testFixturesApi(testFixtures(project(":vertx-auth-mtls")))
+    testFixturesApi(testFixtures(project(":client-common")))
+    testFixturesApi(testFixtures(project(":test-common")))
     testFixturesImplementation("io.vertx:vertx-junit5:${project.vertxVersion}")
     testFixturesImplementation("com.google.inject:guice:${guiceVersion}")
     testFixturesImplementation('org.mockito:mockito-core:4.10.0')
@@ -195,6 +197,8 @@ dependencies {
     testFixturesImplementation("io.vertx:vertx-web:${project.vertxVersion}") {
         exclude group: 'junit', module: 'junit'
     }
+    
testFixturesImplementation("io.vertx:vertx-web-client:${project.vertxVersion}")
+    
testFixturesImplementation('com.datastax.cassandra:cassandra-driver-core:3.11.3')
     integrationTestImplementation(testFixtures(project(":test-common")))
     integrationTestImplementation 
"org.junit.jupiter:junit-jupiter-api:${project.junitVersion}"
     // Needed by the Cassandra dtest framework
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/CassandraInputValidationConfigurationImpl.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/CassandraInputValidationConfigurationImpl.java
index db8f4218..7fa29fca 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/CassandraInputValidationConfigurationImpl.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/CassandraInputValidationConfigurationImpl.java
@@ -53,10 +53,12 @@ public class CassandraInputValidationConfigurationImpl 
implements CassandraInput
     public static final String DEFAULT_ALLOWED_CHARS_FOR_QUOTED_NAME = 
"[a-zA-Z_0-9]{1,48}";
     public static final String ALLOWED_CHARS_FOR_COMPONENT_NAME_PROPERTY = 
"allowed_chars_for_component_name";
     public static final String DEFAULT_ALLOWED_CHARS_FOR_COMPONENT_NAME =
-    "[a-zA-Z0-9_-]+(\\.db|\\.cql|\\.json|\\.crc32|TOC\\.txt)";
+    "[a-zA-Z0-9_+\\-]+(\\.db|\\.cql|\\.json|\\.crc32|TOC\\.txt)";
     public static final String 
ALLOWED_CHARS_FOR_RESTRICTED_COMPONENT_NAME_PROPERTY =
     "allowed_chars_for_restricted_component_name";
-    public static final String 
DEFAULT_ALLOWED_CHARS_FOR_RESTRICTED_COMPONENT_NAME = 
"[a-zA-Z0-9_-]+(\\.db|TOC\\.txt)";
+    // '+' is included in the allowed characters to support downloading SAI 
files in snapshots
+    public static final String 
DEFAULT_ALLOWED_CHARS_FOR_RESTRICTED_COMPONENT_NAME =
+    "[a-zA-Z0-9_+\\-]+(\\.db|TOC\\.txt)";
 
     @JsonProperty(value = VALIDATOR_PROPERTY)
     protected final ParameterizedClassConfiguration validatorConfiguration;
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/data/SSTableImportRequestParam.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/data/SSTableImportRequestParam.java
index 280a6daa..80f548cf 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/data/SSTableImportRequestParam.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/data/SSTableImportRequestParam.java
@@ -22,6 +22,7 @@ import java.util.Objects;
 
 import io.vertx.core.http.HttpServerRequest;
 import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.common.data.SSTableImportOptions;
 import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName;
 
 import static 
org.apache.cassandra.sidecar.utils.RequestUtils.parseBooleanQueryParam;
@@ -39,6 +40,8 @@ public class SSTableImportRequestParam extends SSTableUploads
     private final boolean invalidateCaches;
     private final boolean extendedVerify;
     private final boolean copyData;
+    private final boolean failOnMissingIndex;
+    private final boolean validateIndexChecksum;
 
     /**
      * Constructs an SSTableImportRequest
@@ -53,10 +56,13 @@ public class SSTableImportRequestParam extends 
SSTableUploads
      * @param invalidateCaches   if row cache should be invalidated for the 
keys in the new SSTables
      * @param extendedVerify     if we should run an extended verify checking 
all values in the new SSTables
      * @param copyData           if we should copy data from source paths 
instead of moving them
+     * @param failOnMissingIndex if SAI indexes should be validated during 
import
+     * @param validateIndexChecksum   if SAI index checksums should be 
verified during import
      */
     public SSTableImportRequestParam(QualifiedTableName qualifiedTableName, 
String uploadId, boolean resetLevel,
                                      boolean clearRepaired, boolean 
verifySSTables, boolean verifyTokens,
-                                     boolean invalidateCaches, boolean 
extendedVerify, boolean copyData)
+                                     boolean invalidateCaches, boolean 
extendedVerify, boolean copyData,
+                                     boolean failOnMissingIndex, boolean 
validateIndexChecksum)
     {
         super(qualifiedTableName, uploadId);
         this.resetLevel = resetLevel;
@@ -66,6 +72,8 @@ public class SSTableImportRequestParam extends SSTableUploads
         this.invalidateCaches = invalidateCaches;
         this.extendedVerify = extendedVerify;
         this.copyData = copyData;
+        this.failOnMissingIndex = failOnMissingIndex;
+        this.validateIndexChecksum = validateIndexChecksum;
     }
 
     /**
@@ -125,6 +133,22 @@ public class SSTableImportRequestParam extends 
SSTableUploads
         return copyData;
     }
 
+    /**
+     * @return true if SAI indexes should be validated during import, false 
otherwise
+     */
+    public boolean failOnMissingIndex()
+    {
+        return failOnMissingIndex;
+    }
+
+    /**
+     * @return true if SAI index checksums should be verified during import, 
false otherwise
+     */
+    public boolean validateIndexChecksum()
+    {
+        return validateIndexChecksum;
+    }
+
     /**
      * {@inheritDoc}
      */
@@ -140,6 +164,8 @@ public class SSTableImportRequestParam extends 
SSTableUploads
                && invalidateCaches == that.invalidateCaches
                && extendedVerify == that.extendedVerify
                && copyData == that.copyData
+               && failOnMissingIndex == that.failOnMissingIndex
+               && validateIndexChecksum == that.validateIndexChecksum
                && uploadId().equals(that.uploadId())
                && keyspace().equals(that.keyspace())
                && table().equals(that.table());
@@ -151,7 +177,8 @@ public class SSTableImportRequestParam extends 
SSTableUploads
     public int hashCode()
     {
         return Objects.hash(uploadId(), keyspace(), table(), resetLevel, 
clearRepaired, verifySSTables,
-                            verifyTokens, invalidateCaches, extendedVerify, 
copyData);
+                            verifyTokens, invalidateCaches, extendedVerify, 
copyData,
+                            failOnMissingIndex, validateIndexChecksum);
     }
 
     /**
@@ -170,6 +197,8 @@ public class SSTableImportRequestParam extends 
SSTableUploads
                ", invalidateCaches=" + invalidateCaches +
                ", extendedVerify=" + extendedVerify +
                ", copyData=" + copyData +
+               ", failOnMissingIndex=" + failOnMissingIndex +
+               ", validateIndexChecksum=" + validateIndexChecksum +
                '}';
     }
 
@@ -191,6 +220,10 @@ public class SSTableImportRequestParam extends 
SSTableUploads
                                              parseBooleanQueryParam(request, 
"verifyTokens", true),
                                              parseBooleanQueryParam(request, 
"invalidateCaches", true),
                                              parseBooleanQueryParam(request, 
"extendedVerify", true),
-                                             parseBooleanQueryParam(request, 
"copyData", false));
+                                             parseBooleanQueryParam(request, 
"copyData", false),
+                                             parseBooleanQueryParam(request, 
"failOnMissingIndex",
+                                                                    
SSTableImportOptions.DEFAULT_FAIL_ON_MISSING_INDEX),
+                                             parseBooleanQueryParam(request, 
"validateIndexChecksum",
+                                                                    
SSTableImportOptions.DEFAULT_VALIDATE_INDEX_CHECKSUM));
     }
 }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/sstableuploads/SSTableImportHandler.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/sstableuploads/SSTableImportHandler.java
index 13493029..bf00b238 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/sstableuploads/SSTableImportHandler.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/sstableuploads/SSTableImportHandler.java
@@ -208,6 +208,8 @@ public class SSTableImportHandler extends 
AbstractHandler<SSTableImportRequestPa
                .invalidateCaches(request.invalidateCaches())
                .extendedVerify(request.extendedVerify())
                .copyData(request.copyData())
+               .failOnMissingIndex(request.failOnMissingIndex())
+               .validateIndexChecksum(request.validateIndexChecksum())
                .build();
     }
 }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreRangeTask.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreRangeTask.java
index 544cfaec..02f8cc52 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreRangeTask.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreRangeTask.java
@@ -580,6 +580,8 @@ public class RestoreRangeTask implements RestoreRangeHandler
                                                       
.invalidateCaches(options.invalidateCaches())
                                                       
.extendedVerify(options.extendedVerify())
                                                       
.copyData(options.copyData())
+                                                      
.failOnMissingIndex(options.failOnMissingIndex())
+                                                      
.validateIndexChecksum(options.validateIndexChecksum())
                                                       
.uploadId(range.uploadId())
                                                       .build();
         Future<Void> future = importer.scheduleImport(importOptions)
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/utils/FastCassandraInputValidator.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/utils/FastCassandraInputValidator.java
index 99a0f70b..c01038c1 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/utils/FastCassandraInputValidator.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/utils/FastCassandraInputValidator.java
@@ -254,7 +254,7 @@ public class FastCassandraInputValidator extends 
RegexBasedCassandraInputValidat
      */
     protected boolean isValidComponentNameCharacter(char c)
     {
-        return isAlphanumeric(c) || isUnderscore(c) || isDash(c);
+        return isAlphanumeric(c) || isUnderscore(c) || isDash(c) || isPlus(c);
     }
 
     /**
@@ -293,6 +293,15 @@ public class FastCassandraInputValidator extends 
RegexBasedCassandraInputValidat
         return c == '-';
     }
 
+    /**
+     * @param c the character to test
+     * @return {@code true} if the input {@code c} is a plus sign, {@code 
false} otherwise
+     */
+    protected boolean isPlus(char c)
+    {
+        return c == '+';
+    }
+
     /**
      * @param configMap    the configuration map
      * @param key          the key in the map
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java 
b/server/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java
index e0d0cbde..31c095d8 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java
@@ -39,6 +39,7 @@ import io.vertx.core.Promise;
 import io.vertx.core.Vertx;
 import io.vertx.ext.web.handler.HttpException;
 import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.data.SSTableImportOptions;
 import org.apache.cassandra.sidecar.common.server.TableOperations;
 import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
 import org.apache.cassandra.sidecar.config.ServiceConfiguration;
@@ -64,6 +65,8 @@ public class SSTableImporter
     public static final boolean DEFAULT_INVALIDATE_CACHES = true;
     public static final boolean DEFAULT_EXTENDED_VERIFY = true;
     public static final boolean DEFAULT_COPY_DATA = false;
+    public static final boolean DEFAULT_FAIL_ON_MISSING_INDEX = 
SSTableImportOptions.DEFAULT_FAIL_ON_MISSING_INDEX;
+    public static final boolean DEFAULT_VALIDATE_INDEX_CHECKSUM = 
SSTableImportOptions.DEFAULT_VALIDATE_INDEX_CHECKSUM;
     private final Vertx vertx;
     private final ExecutorPools executorPools;
     private final InstanceMetadataFetcher metadataFetcher;
@@ -231,7 +234,9 @@ public class SSTableImporter
                                                   options.verifyTokens,
                                                   options.invalidateCaches,
                                                   options.extendedVerify,
-                                                  options.copyData);
+                                                  options.copyData,
+                                                  options.failOnMissingIndex,
+                                                  
options.validateIndexChecksum);
                 long serviceTimeNanos = System.nanoTime() - startTime;
                 if (!failedDirectories.isEmpty())
                 {
@@ -361,6 +366,8 @@ public class SSTableImporter
         final boolean invalidateCaches;
         final boolean extendedVerify;
         final boolean copyData;
+        final boolean failOnMissingIndex;
+        final boolean validateIndexChecksum;
 
         private ImportOptions(Builder builder)
         {
@@ -376,6 +383,8 @@ public class SSTableImporter
             invalidateCaches = builder.invalidateCaches;
             extendedVerify = builder.extendedVerify;
             copyData = builder.copyData;
+            failOnMissingIndex = builder.failOnMissingIndex;
+            validateIndexChecksum = builder.validateIndexChecksum;
         }
 
         /**
@@ -417,6 +426,8 @@ public class SSTableImporter
                    && invalidateCaches == options.invalidateCaches
                    && extendedVerify == options.extendedVerify
                    && copyData == options.copyData
+                   && failOnMissingIndex == options.failOnMissingIndex
+                   && validateIndexChecksum == options.validateIndexChecksum
                    && host.equals(options.host)
                    && keyspace.equals(options.keyspace)
                    && tableName.equals(options.tableName)
@@ -430,7 +441,8 @@ public class SSTableImporter
         public int hashCode()
         {
             return Objects.hash(host, keyspace, tableName, directory, 
uploadId, resetLevel, clearRepaired,
-                                verifySSTables, verifyTokens, 
invalidateCaches, extendedVerify, copyData);
+                                verifySSTables, verifyTokens, 
invalidateCaches, extendedVerify, copyData,
+                                failOnMissingIndex, validateIndexChecksum);
         }
 
         /**
@@ -451,6 +463,8 @@ public class SSTableImporter
                    ", invalidateCaches=" + invalidateCaches +
                    ", extendedVerify=" + extendedVerify +
                    ", copyData=" + copyData +
+                   ", failOnMissingIndex=" + failOnMissingIndex +
+                   ", validateIndexChecksum=" + validateIndexChecksum +
                    '}';
         }
 
@@ -471,6 +485,8 @@ public class SSTableImporter
             private boolean invalidateCaches = DEFAULT_INVALIDATE_CACHES;
             private boolean extendedVerify = DEFAULT_EXTENDED_VERIFY;
             private boolean copyData = DEFAULT_COPY_DATA;
+            private boolean failOnMissingIndex = DEFAULT_FAIL_ON_MISSING_INDEX;
+            private boolean validateIndexChecksum = 
DEFAULT_VALIDATE_INDEX_CHECKSUM;
 
             /**
              * Sets the {@code host} and returns a reference to this Builder 
enabling method chaining.
@@ -616,6 +632,30 @@ public class SSTableImporter
                 return this;
             }
 
+            /**
+             * Sets the {@code failOnMissingIndex} and returns a reference to 
this Builder enabling method chaining.
+             *
+             * @param failOnMissingIndex the {@code failOnMissingIndex} to set
+             * @return a reference to this Builder
+             */
+            public Builder failOnMissingIndex(boolean failOnMissingIndex)
+            {
+                this.failOnMissingIndex = failOnMissingIndex;
+                return this;
+            }
+
+            /**
+             * Sets the {@code validateIndexChecksum} and returns a reference 
to this Builder enabling method chaining.
+             *
+             * @param validateIndexChecksum the {@code validateIndexChecksum} 
to set
+             * @return a reference to this Builder
+             */
+            public Builder validateIndexChecksum(boolean validateIndexChecksum)
+            {
+                this.validateIndexChecksum = validateIndexChecksum;
+                return this;
+            }
+
             /**
              * Returns a {@code ImportOptions} built from the parameters 
previously set.
              *
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/handlers/sstableuploads/SSTableImportHandlerTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/sstableuploads/SSTableImportHandlerTest.java
index 8d06dc09..7e505fda 100644
--- 
a/server/src/test/java/org/apache/cassandra/sidecar/handlers/sstableuploads/SSTableImportHandlerTest.java
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/sstableuploads/SSTableImportHandlerTest.java
@@ -44,7 +44,9 @@ import 
org.apache.cassandra.sidecar.metrics.instance.InstanceMetrics;
 import org.apache.cassandra.sidecar.metrics.instance.InstanceMetricsImpl;
 
 import static 
org.apache.cassandra.sidecar.utils.SSTableImporter.DEFAULT_COPY_DATA;
+import static 
org.apache.cassandra.sidecar.utils.SSTableImporter.DEFAULT_FAIL_ON_MISSING_INDEX;
 import static 
org.apache.cassandra.sidecar.utils.SSTableImporter.DEFAULT_INVALIDATE_CACHES;
+import static 
org.apache.cassandra.sidecar.utils.SSTableImporter.DEFAULT_VALIDATE_INDEX_CHECKSUM;
 import static 
org.apache.cassandra.sidecar.utils.SSTableImporter.DEFAULT_VERIFY_TOKENS;
 import static org.apache.cassandra.sidecar.utils.TestMetricUtils.registry;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -140,7 +142,8 @@ public class SSTableImportHandlerTest extends 
BaseUploadsHandlerTest
         when(mockCFOperations.importNewSSTables("ks", "table", 
stageDirectoryAbsolutePath,
                                                 true, true, true,
                                                 DEFAULT_VERIFY_TOKENS, 
DEFAULT_INVALIDATE_CACHES, true,
-                                                DEFAULT_COPY_DATA))
+                                                DEFAULT_COPY_DATA,
+                                                DEFAULT_FAIL_ON_MISSING_INDEX, 
DEFAULT_VALIDATE_INDEX_CHECKSUM))
         .thenReturn(Collections.singletonList(stageDirectoryAbsolutePath));
 
         String requestURI = "/api/v1/uploads/" + uploadId + 
"/keyspaces/ks/tables/table/import";
@@ -158,7 +161,8 @@ public class SSTableImportHandlerTest extends 
BaseUploadsHandlerTest
         when(mockCFOperations.importNewSSTables("ks", "table", 
stageDirectoryAbsolutePath,
                                                 false, true, true,
                                                 DEFAULT_VERIFY_TOKENS, 
DEFAULT_INVALIDATE_CACHES, true,
-                                                DEFAULT_COPY_DATA))
+                                                DEFAULT_COPY_DATA,
+                                                DEFAULT_FAIL_ON_MISSING_INDEX, 
DEFAULT_VALIDATE_INDEX_CHECKSUM))
         .thenReturn(Collections.emptyList());
 
         String requestURI = "/api/v1/uploads/" + uploadId + 
"/keyspaces/ks/tables/table/import";
@@ -181,7 +185,8 @@ public class SSTableImportHandlerTest extends 
BaseUploadsHandlerTest
         when(mockCFOperations.importNewSSTables("ks", "table", 
stageDirectoryAbsolutePath,
                                                 false, true, true,
                                                 DEFAULT_VERIFY_TOKENS, 
DEFAULT_INVALIDATE_CACHES, true,
-                                                DEFAULT_COPY_DATA))
+                                                DEFAULT_COPY_DATA,
+                                                DEFAULT_FAIL_ON_MISSING_INDEX, 
DEFAULT_VALIDATE_INDEX_CHECKSUM))
         .thenReturn(Collections.emptyList());
 
         String requestURI = "/api/v1/uploads/" + uploadId + 
"/keyspaces/ks/tables/table/import";
@@ -199,7 +204,9 @@ public class SSTableImportHandlerTest extends 
BaseUploadsHandlerTest
                                                                    
DEFAULT_VERIFY_TOKENS,
                                                                    
DEFAULT_INVALIDATE_CACHES,
                                                                    true,
-                                                                   
DEFAULT_COPY_DATA);
+                                                                   
DEFAULT_COPY_DATA,
+                                                                   
DEFAULT_FAIL_ON_MISSING_INDEX,
+                                                                   
DEFAULT_VALIDATE_INDEX_CHECKSUM);
                         context.completeNow();
                     })));
         assertThat(context.awaitCompletion(30, TimeUnit.SECONDS)).isTrue();
@@ -214,7 +221,8 @@ public class SSTableImportHandlerTest extends 
BaseUploadsHandlerTest
         when(mockCFOperations.importNewSSTables("ks", "table", 
stageDirectoryAbsolutePath,
                                                 true, false, true,
                                                 DEFAULT_VERIFY_TOKENS, 
DEFAULT_INVALIDATE_CACHES, true,
-                                                DEFAULT_COPY_DATA))
+                                                DEFAULT_COPY_DATA,
+                                                DEFAULT_FAIL_ON_MISSING_INDEX, 
DEFAULT_VALIDATE_INDEX_CHECKSUM))
         .thenReturn(Collections.emptyList());
 
         String requestURI = "/api/v1/uploads/" + uploadId + 
"/keyspaces/ks/tables/table/import";
@@ -232,7 +240,9 @@ public class SSTableImportHandlerTest extends 
BaseUploadsHandlerTest
                                                                    
DEFAULT_VERIFY_TOKENS,
                                                                    
DEFAULT_INVALIDATE_CACHES,
                                                                    true,
-                                                                   
DEFAULT_COPY_DATA);
+                                                                   
DEFAULT_COPY_DATA,
+                                                                   
DEFAULT_FAIL_ON_MISSING_INDEX,
+                                                                   
DEFAULT_VALIDATE_INDEX_CHECKSUM);
                         context.completeNow();
                     })));
         assertThat(context.awaitCompletion(30, TimeUnit.SECONDS)).isTrue();
@@ -247,7 +257,8 @@ public class SSTableImportHandlerTest extends 
BaseUploadsHandlerTest
         when(mockCFOperations.importNewSSTables("ks", "table", 
stageDirectoryAbsolutePath,
                                                 true, true, false,
                                                 DEFAULT_VERIFY_TOKENS, 
DEFAULT_INVALIDATE_CACHES, true,
-                                                DEFAULT_COPY_DATA))
+                                                DEFAULT_COPY_DATA,
+                                                DEFAULT_FAIL_ON_MISSING_INDEX, 
DEFAULT_VALIDATE_INDEX_CHECKSUM))
         .thenReturn(Collections.emptyList());
 
         String requestURI = "/api/v1/uploads/" + uploadId + 
"/keyspaces/ks/tables/table/import";
@@ -265,7 +276,9 @@ public class SSTableImportHandlerTest extends 
BaseUploadsHandlerTest
                                                                    
DEFAULT_VERIFY_TOKENS,
                                                                    
DEFAULT_INVALIDATE_CACHES,
                                                                    true,
-                                                                   
DEFAULT_COPY_DATA);
+                                                                   
DEFAULT_COPY_DATA,
+                                                                   
DEFAULT_FAIL_ON_MISSING_INDEX,
+                                                                   
DEFAULT_VALIDATE_INDEX_CHECKSUM);
                         context.completeNow();
                     })));
         assertThat(context.awaitCompletion(30, TimeUnit.SECONDS)).isTrue();
@@ -280,7 +293,8 @@ public class SSTableImportHandlerTest extends 
BaseUploadsHandlerTest
         when(mockCFOperations.importNewSSTables("ks", "table", 
stageDirectoryAbsolutePath,
                                                 true, true, true,
                                                 DEFAULT_VERIFY_TOKENS, 
DEFAULT_INVALIDATE_CACHES, false,
-                                                
DEFAULT_COPY_DATA)).thenReturn(Collections.emptyList());
+                                                DEFAULT_COPY_DATA,
+                                                DEFAULT_FAIL_ON_MISSING_INDEX, 
DEFAULT_VALIDATE_INDEX_CHECKSUM)).thenReturn(Collections.emptyList());
 
         String requestURI = "/api/v1/uploads/" + uploadId + 
"/keyspaces/ks/tables/table/import";
         sendRequest(context,
@@ -297,7 +311,9 @@ public class SSTableImportHandlerTest extends 
BaseUploadsHandlerTest
                                                                    
DEFAULT_VERIFY_TOKENS,
                                                                    
DEFAULT_INVALIDATE_CACHES,
                                                                    false,
-                                                                   
DEFAULT_COPY_DATA);
+                                                                   
DEFAULT_COPY_DATA,
+                                                                   
DEFAULT_FAIL_ON_MISSING_INDEX,
+                                                                   
DEFAULT_VALIDATE_INDEX_CHECKSUM);
                         context.completeNow();
                     })));
 
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/restore/RestoreRangeTaskTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/restore/RestoreRangeTaskTest.java
index ed799477..4a8a170c 100644
--- 
a/server/src/test/java/org/apache/cassandra/sidecar/restore/RestoreRangeTaskTest.java
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/restore/RestoreRangeTaskTest.java
@@ -53,6 +53,7 @@ import 
org.apache.cassandra.sidecar.cluster.locator.LocalTokenRangesProvider;
 import org.apache.cassandra.sidecar.common.ResourceUtils;
 import org.apache.cassandra.sidecar.common.data.ConsistencyLevel;
 import org.apache.cassandra.sidecar.common.data.RestoreJobStatus;
+import org.apache.cassandra.sidecar.common.data.SSTableImportOptions;
 import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange;
 import org.apache.cassandra.sidecar.common.server.utils.ThrowableUtils;
 import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
@@ -75,6 +76,7 @@ import 
org.apache.cassandra.sidecar.restore.RestoreSliceManifest.ManifestEntry;
 import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
 import org.apache.cassandra.sidecar.utils.SSTableImporter;
 import org.apache.cassandra.sidecar.utils.XXHash32Provider;
+import org.mockito.ArgumentCaptor;
 import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
 import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
 
@@ -358,6 +360,44 @@ class RestoreRangeTaskTest
         assertThat(mockRange.hasImported()).isFalse();
     }
 
+    @Test
+    void testCommitPassesSaiOptionsToImporter(@TempDir Path testFolder)
+    {
+        SSTableImportOptions customOptions = SSTableImportOptions.defaults()
+                                                                 
.failOnMissingIndex(true)
+                                                                 
.validateIndexChecksum(true);
+        RestoreJob job = RestoreJobTest.createTestingJob(UUIDs.timeBased(), 
RestoreJobStatus.IMPORT_READY, ConsistencyLevel.QUORUM)
+                                       .unbuild()
+                                       .sstableImportOptions(customOptions)
+                                       .build();
+        
when(mockSSTableImporter.scheduleImport(any())).thenReturn(Future.succeededFuture());
+        RestoreRangeTask task = createTask(mockRange, job);
+        Future<?> result = task.commit(testFolder.toFile());
+        assertThat(result.failed()).isFalse();
+
+        ArgumentCaptor<SSTableImporter.ImportOptions> captor = 
ArgumentCaptor.forClass(SSTableImporter.ImportOptions.class);
+        verify(mockSSTableImporter).scheduleImport(captor.capture());
+        SSTableImporter.ImportOptions captured = captor.getValue();
+
+        SSTableImporter.ImportOptions expected = new 
SSTableImporter.ImportOptions.Builder()
+                                                 
.host(mockRange.owner().host())
+                                                 
.keyspace(mockRange.keyspace())
+                                                 .tableName(mockRange.table())
+                                                 
.directory(testFolder.toFile().toString())
+                                                 
.resetLevel(customOptions.resetLevel())
+                                                 
.clearRepaired(customOptions.clearRepaired())
+                                                 
.verifySSTables(customOptions.verifySSTables())
+                                                 
.verifyTokens(customOptions.verifyTokens())
+                                                 
.invalidateCaches(customOptions.invalidateCaches())
+                                                 
.extendedVerify(customOptions.extendedVerify())
+                                                 
.copyData(customOptions.copyData())
+                                                 
.failOnMissingIndex(customOptions.failOnMissingIndex())
+                                                 
.validateIndexChecksum(customOptions.validateIndexChecksum())
+                                                 
.uploadId(mockRange.uploadId())
+                                                 .build();
+        assertThat(captured).isEqualTo(expected);
+    }
+
     @Test
     void testHandlingUnexpectedExceptionInObjectExistsCheck(@TempDir Path 
testFolder)
     {
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/utils/SSTableImporterTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/utils/SSTableImporterTest.java
index 95a0ee44..cd7cdc76 100644
--- 
a/server/src/test/java/org/apache/cassandra/sidecar/utils/SSTableImporterTest.java
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/utils/SSTableImporterTest.java
@@ -98,14 +98,17 @@ class SSTableImporterTest
         
when(mockMetadataFetcher.instance("127.0.0.3")).thenReturn(mockInstanceMetadata3);
         
when(mockCassandraAdapterDelegate1.tableOperations()).thenReturn(mockTableOperations1);
         when(mockTableOperations1.importNewSSTables("ks", "tbl", "/dir", true, 
true,
-                                                    true, true, true, true, 
false))
+                                                    true, true, true, true, 
false, false, false))
+        .thenReturn(Collections.emptyList());
+        when(mockTableOperations1.importNewSSTables("ks2", "tbl", "/dir", 
true, true,
+                                                    true, true, true, true, 
false, false, false))
         .thenReturn(Collections.emptyList());
         when(mockTableOperations1.importNewSSTables("ks", "tbl", 
"/failed-dir", true, true,
-                                                    true, true, true, true, 
false))
+                                                    true, true, true, true, 
false, false, false))
         .thenReturn(Collections.singletonList("/failed-dir"));
         
when(mockCassandraAdapterDelegate2.tableOperations()).thenReturn(mockTableOperations2);
         when(mockTableOperations2.importNewSSTables("ks", "tbl", "/dir", true, 
true,
-                                                    true, true, true, true, 
false))
+                                                    true, true, true, true, 
false, false, false))
         .thenThrow(new RuntimeException("Exception during import"));
         when(mockCassandraAdapterDelegate3.tableOperations()).thenThrow(new 
CassandraUnavailableException(CQL_AND_JMX, "Cassandra unavailable"));
         executorPools = new ExecutorPools(vertx, serviceConfiguration);
@@ -154,7 +157,7 @@ class SSTableImporterTest
                 assertThat(queue).isEmpty();
             }
             verify(mockTableOperations1, times(1))
-            .importNewSSTables("ks", "tbl", "/dir", true, true, true, true, 
true, true, false);
+            .importNewSSTables("ks", "tbl", "/dir", true, true, true, true, 
true, true, false, false, false);
             vertx.setTimer(100, handle -> {
                 // after successful import, the queue must be drained
                 
assertThat(instanceMetrics(1).sstableImport().pendingImports.metric.getValue()).isZero();
@@ -354,9 +357,9 @@ class SSTableImporterTest
                       assertThat(queue).isEmpty();
                   }
                   verify(mockTableOperations1, times(1))
-                  .importNewSSTables("ks", "tbl", "/dir", true, true, true, 
true, true, true, false);
+                  .importNewSSTables("ks", "tbl", "/dir", true, true, true, 
true, true, true, false, false, false);
                   verify(mockTableOperations1, times(1))
-                  .importNewSSTables("ks2", "tbl", "/dir", true, true, true, 
true, true, true, false);
+                  .importNewSSTables("ks2", "tbl", "/dir", true, true, true, 
true, true, true, false, false, false);
                   vertx.setTimer(100, handle -> {
                       // after successful import, the queue must be drained
                       
assertThat(instanceMetrics(1).sstableImport().pendingImports.metric.getValue()).isZero();
diff --git 
a/server/src/test/integration/org/apache/cassandra/sidecar/restore/RestoreJobTestUtils.java
 
b/server/src/testFixtures/java/org/apache/cassandra/sidecar/restore/RestoreJobTestUtils.java
similarity index 90%
rename from 
server/src/test/integration/org/apache/cassandra/sidecar/restore/RestoreJobTestUtils.java
rename to 
server/src/testFixtures/java/org/apache/cassandra/sidecar/restore/RestoreJobTestUtils.java
index cbb35a22..ebd8dbc6 100644
--- 
a/server/src/test/integration/org/apache/cassandra/sidecar/restore/RestoreJobTestUtils.java
+++ 
b/server/src/testFixtures/java/org/apache/cassandra/sidecar/restore/RestoreJobTestUtils.java
@@ -86,14 +86,21 @@ public class RestoreJobTestUtils
     }
 
     public static UUID createJob(RestoreJobTestUtils.RestoreJobClient 
testClient, QualifiedTableName tableName)
+    {
+        return createJob(testClient, tableName, builder -> { });
+    }
+
+    public static UUID createJob(RestoreJobTestUtils.RestoreJobClient 
testClient, QualifiedTableName tableName,
+                                 
Consumer<CreateRestoreJobRequestPayload.Builder> customizer)
     {
         UUID jobId = UUIDs.timeBased();
         long expireAt = System.currentTimeMillis() + 
TimeUnit.MINUTES.toMillis(2);
-        CreateRestoreJobRequestPayload payload = CreateRestoreJobRequestPayload
-                                                 
.builder(RestoreJobSecretsGen.genRestoreJobSecrets(), expireAt)
-                                                 .jobId(jobId)
-                                                 
.consistencyLevel(ConsistencyLevel.LOCAL_QUORUM, "datacenter1").build();
-        testClient.createRestoreJob(tableName, payload);
+        CreateRestoreJobRequestPayload.Builder builder = 
CreateRestoreJobRequestPayload
+                                                         
.builder(RestoreJobSecretsGen.genRestoreJobSecrets(), expireAt)
+                                                         .jobId(jobId)
+                                                         
.consistencyLevel(ConsistencyLevel.LOCAL_QUORUM, "datacenter1");
+        customizer.accept(builder);
+        testClient.createRestoreJob(tableName, builder.build());
         return jobId;
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to