This is an automated email from the ASF dual-hosted git repository.
yifan-c pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push:
new eb2d55f9 CASSSIDECAR-422: SAI support in Sidecar (#333)
eb2d55f9 is described below
commit eb2d55f987ff90da552aacfb77b3c421d44f0340
Author: Shailaja Koppu <[email protected]>
AuthorDate: Fri May 15 19:38:49 2026 +0100
CASSSIDECAR-422: SAI support in Sidecar (#333)
Patch by Shailaja Koppu; Reviewed by Jyothsna konisa, Yifan Cai for
CASSSIDECAR-422
---
CHANGES.txt | 1 +
.../adapters/base/CassandraTableOperations.java | 28 ++-
.../adapters/cassandra50/Cassandra50Adapter.java | 13 ++
.../cassandra50/Cassandra50TableJmxOperations.java | 59 ++++++
.../cassandra50/Cassandra50TableOperations.java} | 44 ++---
.../sidecar/common/data/SSTableImportOptions.java | 35 +++-
.../common/request/ImportSSTableRequest.java | 34 ++++
.../CreateRestoreJobRequestPayloadTest.java | 8 +-
.../sidecar/client/SidecarClientTest.java | 45 +++++
.../RestoreJobDiscovererSAIOptionsIntTest.java | 207 ++++++++++++++++++++
.../SSTableImportWithSaiIntegrationTest.java | 215 +++++++++++++++++++++
.../sidecar/common/server/TableOperations.java | 32 +++
server/build.gradle | 4 +
.../CassandraInputValidationConfigurationImpl.java | 6 +-
.../handlers/data/SSTableImportRequestParam.java | 39 +++-
.../sstableuploads/SSTableImportHandler.java | 2 +
.../sidecar/restore/RestoreRangeTask.java | 2 +
.../sidecar/utils/FastCassandraInputValidator.java | 11 +-
.../cassandra/sidecar/utils/SSTableImporter.java | 44 ++++-
.../sstableuploads/SSTableImportHandlerTest.java | 36 +++-
.../sidecar/restore/RestoreRangeTaskTest.java | 40 ++++
.../sidecar/utils/SSTableImporterTest.java | 15 +-
.../sidecar/restore/RestoreJobTestUtils.java | 17 +-
23 files changed, 871 insertions(+), 66 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 0ac2ac99..e721b48e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
0.4.0
-----
+ * SAI support in Sidecar (CASSSIDECAR-422)
* Support column types not parseable by Java 3.x driver (CASSSIDECAR-443)
* CdcManager.getInstanceId(instanceIp) returns -1 as it resolves ipAddress to
null (CASSSIDECAR-417)
* Fix circle CI pipelines OOM (CASSSIDECAR-423)
diff --git
a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraTableOperations.java
b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraTableOperations.java
index a4787f38..9975e45a 100644
---
a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraTableOperations.java
+++
b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraTableOperations.java
@@ -32,7 +32,7 @@ import org.jetbrains.annotations.NotNull;
*/
public class CassandraTableOperations implements TableOperations
{
- private final JmxClient jmxClient;
+ protected final JmxClient jmxClient;
public CassandraTableOperations(JmxClient jmxClient)
{
@@ -65,6 +65,30 @@ public class CassandraTableOperations implements
TableOperations
copyData);
}
+ /**
+ * {@inheritDoc}
+ * <p>
+ * SAI index validation is only supported in Cassandra 5.0+; for older
versions these parameters are ignored.
+ */
+ @Override
+ public List<String> importNewSSTables(@NotNull String keyspace,
+ @NotNull String tableName,
+ @NotNull String directory,
+ boolean resetLevel,
+ boolean clearRepaired,
+ boolean verifySSTables,
+ boolean verifyTokens,
+ boolean invalidateCaches,
+ boolean extendedVerify,
+ boolean copyData,
+ boolean failOnMissingIndex,
+ boolean validateIndexChecksum)
+ {
+ return importNewSSTables(keyspace, tableName, directory, resetLevel,
+ clearRepaired, verifySSTables, verifyTokens,
+ invalidateCaches, extendedVerify, copyData);
+ }
+
/**
* {@inheritDoc}
*/
@@ -75,7 +99,7 @@ public class CassandraTableOperations implements
TableOperations
.getDataPaths();
}
- String tableMBeanName(String keyspace, String tableName)
+ protected String tableMBeanName(String keyspace, String tableName)
{
return
String.format("org.apache.cassandra.db:type=%s,keyspace=%s,table=%s",
tableName.contains(".") ? "IndexTables" :
"Tables",
diff --git
a/adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/Cassandra50Adapter.java
b/adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/Cassandra50Adapter.java
index f7ac798b..49b141cd 100644
---
a/adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/Cassandra50Adapter.java
+++
b/adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/Cassandra50Adapter.java
@@ -26,6 +26,7 @@ import
org.apache.cassandra.sidecar.common.server.CompactionManagerOperations;
import org.apache.cassandra.sidecar.common.server.ICassandraAdapter;
import org.apache.cassandra.sidecar.common.server.JmxClient;
import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.common.server.TableOperations;
import org.apache.cassandra.sidecar.common.server.dns.DnsResolver;
import org.apache.cassandra.sidecar.common.server.utils.DriverUtils;
import org.apache.cassandra.sidecar.db.schema.TableSchemaFetcher;
@@ -56,6 +57,18 @@ public class Cassandra50Adapter extends CassandraAdapter
return new Cassandra50StorageOperations(jmxClient, dnsResolver);
}
+ /**
+ * {@inheritDoc}
+ * <p>
+ * Returns Cassandra 5.0-specific TableOperations that supports SAI index
validation
+ */
+ @Override
+ @NotNull
+ protected TableOperations createTableOperations(JmxClient jmxClient)
+ {
+ return new Cassandra50TableOperations(jmxClient);
+ }
+
/**
* {@inheritDoc}
*
diff --git
a/adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/Cassandra50TableJmxOperations.java
b/adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/Cassandra50TableJmxOperations.java
new file mode 100644
index 00000000..12b6214a
--- /dev/null
+++
b/adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/Cassandra50TableJmxOperations.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.cassandra50;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.cassandra.sidecar.adapters.base.jmx.TableJmxOperations;
+
+/**
+ * An interface that extends {@link TableJmxOperations} with the Cassandra 5.0
MBean signature
+ * for {@code importNewSSTables}, which includes SAI index validation
parameters.
+ */
+public interface Cassandra50TableJmxOperations extends TableJmxOperations
+{
+ /**
+ * Load new sstables from the given directory with SAI index validation
support.
+ *
+ * @param srcPaths the path to the new sstables - if it is an
empty set, the data directories will
+ * be scanned
+ * @param resetLevel if the level should be reset to 0 on the new
sstables
+ * @param clearRepaired if repaired info should be wiped from the new
sstables
+ * @param verifySSTables if the new sstables should be verified that
they are not corrupt
+ * @param verifyTokens if the tokens in the new sstables should be
verified that they are owned by the
+ * current node
+ * @param invalidateCaches if row cache should be invalidated for the
keys in the new sstables
+ * @param extendedVerify if we should run an extended verify checking
all values in the new sstables
+ * @param copyData if we should copy data from source paths
instead of moving them
+ * @param failOnMissingIndex if SAI indexes should be validated during
import
+ * @param validateIndexChecksum if SAI index checksums should be
verified during import
+ * @return list of failed import directories
+ */
+ List<String> importNewSSTables(Set<String> srcPaths,
+ boolean resetLevel,
+ boolean clearRepaired,
+ boolean verifySSTables,
+ boolean verifyTokens,
+ boolean invalidateCaches,
+ boolean extendedVerify,
+ boolean copyData,
+ boolean failOnMissingIndex,
+ boolean validateIndexChecksum);
+}
diff --git
a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraTableOperations.java
b/adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/Cassandra50TableOperations.java
similarity index 61%
copy from
adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraTableOperations.java
copy to
adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/Cassandra50TableOperations.java
index a4787f38..01511665 100644
---
a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraTableOperations.java
+++
b/adapters/adapters-cassandra50/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra50/Cassandra50TableOperations.java
@@ -16,27 +16,24 @@
* limitations under the License.
*/
-package org.apache.cassandra.sidecar.adapters.base;
+package org.apache.cassandra.sidecar.adapters.cassandra50;
-import java.io.IOException;
import java.util.Collections;
import java.util.List;
-import org.apache.cassandra.sidecar.adapters.base.jmx.TableJmxOperations;
+import org.apache.cassandra.sidecar.adapters.base.CassandraTableOperations;
import org.apache.cassandra.sidecar.common.server.JmxClient;
-import org.apache.cassandra.sidecar.common.server.TableOperations;
import org.jetbrains.annotations.NotNull;
/**
- * An implementation of the {@link TableOperations} that interfaces with
Cassandra 4.0 and later
+ * A {@link CassandraTableOperations} extension for Cassandra 5.0 that
supports SAI index validation
+ * parameters when importing SSTables.
*/
-public class CassandraTableOperations implements TableOperations
+public class Cassandra50TableOperations extends CassandraTableOperations
{
- private final JmxClient jmxClient;
-
- public CassandraTableOperations(JmxClient jmxClient)
+ public Cassandra50TableOperations(JmxClient jmxClient)
{
- this.jmxClient = jmxClient;
+ super(jmxClient);
}
/**
@@ -52,9 +49,11 @@ public class CassandraTableOperations implements
TableOperations
boolean verifyTokens,
boolean invalidateCaches,
boolean extendedVerify,
- boolean copyData)
+ boolean copyData,
+ boolean failOnMissingIndex,
+ boolean validateIndexChecksum)
{
- return jmxClient.proxy(TableJmxOperations.class,
tableMBeanName(keyspace, tableName))
+ return jmxClient.proxy(Cassandra50TableJmxOperations.class,
tableMBeanName(keyspace, tableName))
.importNewSSTables(Collections.singleton(directory),
resetLevel,
clearRepaired,
@@ -62,23 +61,8 @@ public class CassandraTableOperations implements
TableOperations
verifyTokens,
invalidateCaches,
extendedVerify,
- copyData);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public List<String> getDataPaths(@NotNull String keyspace, @NotNull String
table) throws IOException
- {
- return jmxClient.proxy(TableJmxOperations.class,
tableMBeanName(keyspace, table))
- .getDataPaths();
- }
-
- String tableMBeanName(String keyspace, String tableName)
- {
- return
String.format("org.apache.cassandra.db:type=%s,keyspace=%s,table=%s",
- tableName.contains(".") ? "IndexTables" :
"Tables",
- keyspace, tableName);
+ copyData,
+ failOnMissingIndex,
+ validateIndexChecksum);
}
}
diff --git
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/data/SSTableImportOptions.java
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/data/SSTableImportOptions.java
index 60b6a492..f9c62c44 100644
---
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/data/SSTableImportOptions.java
+++
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/data/SSTableImportOptions.java
@@ -18,14 +18,19 @@
package org.apache.cassandra.sidecar.common.data;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
/**
* Options for Cassandra import nodetool command. It is like properties.
* Supports Json serialization and deserialization.
+ * <p>
+ * This class is used exclusively in the S3 restore job path ({@code
CreateRestoreJobRequestPayload}).
*/
-public class SSTableImportOptions extends HashMap<String, String>
+public class SSTableImportOptions extends LinkedHashMap<String, String>
{
+ public static final boolean DEFAULT_FAIL_ON_MISSING_INDEX = false;
+ public static final boolean DEFAULT_VALIDATE_INDEX_CHECKSUM = false;
+
private static final String RESET_LEVEL = "resetLevel";
private static final String CLEAR_REPAIRED = "clearRepaired";
private static final String VERIFY_SSTABLES = "verifySSTables";
@@ -33,6 +38,8 @@ public class SSTableImportOptions extends HashMap<String,
String>
private static final String INVALIDATE_CACHES = "invalidateCaches";
private static final String EXTENDED_VERIFY = "extendedVerify";
private static final String COPY_DATA = "copyData";
+ private static final String FAIL_ON_MISSING_INDEX = "failOnMissingIndex";
+ private static final String VALIDATE_INDEX_CHECKSUM =
"validateIndexChecksum";
public static SSTableImportOptions defaults()
{
@@ -48,6 +55,8 @@ public class SSTableImportOptions extends HashMap<String,
String>
put(INVALIDATE_CACHES, Boolean.toString(true));
put(EXTENDED_VERIFY, Boolean.toString(true));
put(COPY_DATA, Boolean.toString(false)); // note: the default is false
+ put(FAIL_ON_MISSING_INDEX,
Boolean.toString(DEFAULT_FAIL_ON_MISSING_INDEX));
+ put(VALIDATE_INDEX_CHECKSUM,
Boolean.toString(DEFAULT_VALIDATE_INDEX_CHECKSUM));
}
public SSTableImportOptions resetLevel(boolean enabled)
@@ -126,4 +135,26 @@ public class SSTableImportOptions extends HashMap<String,
String>
{
return Boolean.parseBoolean(get(COPY_DATA));
}
+
+ public SSTableImportOptions failOnMissingIndex(boolean enabled)
+ {
+ put(FAIL_ON_MISSING_INDEX, Boolean.toString(enabled));
+ return this;
+ }
+
+ public boolean failOnMissingIndex()
+ {
+ return Boolean.parseBoolean(get(FAIL_ON_MISSING_INDEX));
+ }
+
+ public SSTableImportOptions validateIndexChecksum(boolean enabled)
+ {
+ put(VALIDATE_INDEX_CHECKSUM, Boolean.toString(enabled));
+ return this;
+ }
+
+ public boolean validateIndexChecksum()
+ {
+ return Boolean.parseBoolean(get(VALIDATE_INDEX_CHECKSUM));
+ }
}
diff --git
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/ImportSSTableRequest.java
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/ImportSSTableRequest.java
index 7633b944..19919db3 100644
---
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/ImportSSTableRequest.java
+++
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/ImportSSTableRequest.java
@@ -65,6 +65,8 @@ public class ImportSSTableRequest extends
JsonRequest<SSTableImportResponse>
private Boolean invalidateCaches;
private Boolean extendedVerify;
private Boolean copyData;
+ private Boolean failOnMissingIndex;
+ private Boolean validateIndexChecksum;
public ImportOptions()
{
@@ -153,6 +155,30 @@ public class ImportSSTableRequest extends
JsonRequest<SSTableImportResponse>
this.copyData = copyData;
return this;
}
+
+ /**
+ * Sets the {@code failOnMissingIndex} and returns a reference to this
ImportOptions enabling method chaining.
+ *
+ * @param failOnMissingIndex the {@code failOnMissingIndex} to set
+ * @return a reference to this ImportOptions
+ */
+ public ImportOptions failOnMissingIndex(boolean failOnMissingIndex)
+ {
+ this.failOnMissingIndex = failOnMissingIndex;
+ return this;
+ }
+
+ /**
+ * Sets the {@code validateIndexChecksum} and returns a reference to
this ImportOptions enabling method chaining.
+ *
+ * @param validateIndexChecksum the {@code validateIndexChecksum} to
set
+ * @return a reference to this ImportOptions
+ */
+ public ImportOptions validateIndexChecksum(boolean
validateIndexChecksum)
+ {
+ this.validateIndexChecksum = validateIndexChecksum;
+ return this;
+ }
}
static String requestURI(String keyspace, String tableName, String
uploadId, ImportOptions importOptions)
@@ -205,6 +231,14 @@ public class ImportSSTableRequest extends
JsonRequest<SSTableImportResponse>
{
options.add("copyData=" + importOptions.copyData);
}
+ if (importOptions.failOnMissingIndex != null)
+ {
+ options.add("failOnMissingIndex=" +
importOptions.failOnMissingIndex);
+ }
+ if (importOptions.validateIndexChecksum != null)
+ {
+ options.add("validateIndexChecksum=" +
importOptions.validateIndexChecksum);
+ }
return options;
}
diff --git
a/client-common/src/test/java/org/apache/cassandra/sidecar/common/request/CreateRestoreJobRequestPayloadTest.java
b/client-common/src/test/java/org/apache/cassandra/sidecar/common/request/CreateRestoreJobRequestPayloadTest.java
index e1f831b9..55628270 100644
---
a/client-common/src/test/java/org/apache/cassandra/sidecar/common/request/CreateRestoreJobRequestPayloadTest.java
+++
b/client-common/src/test/java/org/apache/cassandra/sidecar/common/request/CreateRestoreJobRequestPayloadTest.java
@@ -65,13 +65,15 @@ class CreateRestoreJobRequestPayloadTest
"\"jobAgent\":\"agent\"," +
"\"secrets\":" +
MAPPER.writeValueAsString(secrets) + "," +
"\"importOptions\":{" +
- "\"verifyTokens\":\"true\"," +
"\"resetLevel\":\"true\"," +
"\"clearRepaired\":\"true\"," +
- "\"extendedVerify\":\"true\"," +
"\"verifySSTables\":\"true\"," +
+ "\"verifyTokens\":\"true\"," +
"\"invalidateCaches\":\"true\"," +
- "\"copyData\":\"false\"}," +
+ "\"extendedVerify\":\"true\"," +
+ "\"copyData\":\"false\"," +
+ "\"failOnMissingIndex\":\"false\"," +
+ "\"validateIndexChecksum\":\"false\"}," +
"\"expireAt\":" + expireAt + "," +
"\"consistencyLevel\":\"LOCAL_QUORUM\"," +
"\"localDatacenter\":\"DC1\"}");
diff --git
a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
index 336f636f..3cb732d7 100644
---
a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
+++
b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
@@ -848,6 +848,51 @@ abstract class SidecarClientTest
assertThat(request.getMethod()).isEqualTo("PUT");
}
+ @Test
+ void testSSTableImportWithSaiOptions() throws Exception
+ {
+ String responseAsString =
"{\"success\":true,\"uploadId\":\"0000-0000\",\"keyspace\":\"cycling\"," +
+ "\"tableName\":\"cyclist_name\"}";
+ MockResponse response = new
MockResponse().setResponseCode(OK.code()).setBody(responseAsString);
+ SidecarInstanceImpl sidecarInstance = instances.get(0);
+ MockWebServer mockWebServer = servers.get(0);
+ mockWebServer.enqueue(response);
+
+ ImportSSTableRequest.ImportOptions options = new
ImportSSTableRequest.ImportOptions()
+ .resetLevel(true)
+ .clearRepaired(true)
+ .verifySSTables(true)
+ .verifyTokens(true)
+ .invalidateCaches(true)
+ .extendedVerify(true)
+ .copyData(true)
+ .failOnMissingIndex(true)
+
.validateIndexChecksum(true);
+ SSTableImportResponse result =
client.importSSTableRequest(sidecarInstance,
+ "cycling",
+
"cyclist_name",
+ "0000-0000",
+ options)
+ .get(30, TimeUnit.SECONDS);
+
+ assertThat(result).isNotNull();
+ assertThat(result.keyspace()).isEqualTo("cycling");
+ assertThat(result.tableName()).isEqualTo("cyclist_name");
+ assertThat(result.success()).isTrue();
+ assertThat(result.uploadId()).isEqualTo("0000-0000");
+
+ assertThat(mockWebServer.getRequestCount()).isEqualTo(1);
+ RecordedRequest request = mockWebServer.takeRequest();
+
assertThat(request.getPath()).isEqualTo(ApiEndpointsV1.SSTABLE_IMPORT_ROUTE
+
.replaceAll(KEYSPACE_PATH_PARAM, "cycling")
+
.replaceAll(ApiEndpointsV1.TABLE_PATH_PARAM, "cyclist_name")
+
.replaceAll(ApiEndpointsV1.UPLOAD_ID_PATH_PARAM, "0000-0000")
+ +
"?resetLevel=true&clearRepaired=true&verifySSTables=true&" +
+
"verifyTokens=true&invalidateCaches=true&extendedVerify=true&" +
+
"copyData=true&failOnMissingIndex=true&validateIndexChecksum=true");
+ assertThat(request.getMethod()).isEqualTo("PUT");
+ }
+
@Test
void testUploadSSTableFailsWhenFileDoesNotExist()
{
diff --git
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/restore/jobdiscoverer/RestoreJobDiscovererSAIOptionsIntTest.java
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/restore/jobdiscoverer/RestoreJobDiscovererSAIOptionsIntTest.java
new file mode 100644
index 00000000..d2dcf31e
--- /dev/null
+++
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/restore/jobdiscoverer/RestoreJobDiscovererSAIOptionsIntTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.restore.jobdiscoverer;
+
+import java.math.BigInteger;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+import org.junit.jupiter.api.Test;
+
+import com.google.inject.AbstractModule;
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.sidecar.common.data.RestoreJobStatus;
+import org.apache.cassandra.sidecar.common.data.SSTableImportOptions;
+import
org.apache.cassandra.sidecar.common.request.data.CreateRestoreJobRequestPayload;
+import
org.apache.cassandra.sidecar.common.request.data.CreateSliceRequestPayload;
+import
org.apache.cassandra.sidecar.common.request.data.UpdateRestoreJobRequestPayload;
+import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName;
+import org.apache.cassandra.sidecar.db.RestoreJob;
+import org.apache.cassandra.sidecar.db.RestoreJobDatabaseAccessor;
+import org.apache.cassandra.sidecar.db.RestoreRange;
+import org.apache.cassandra.sidecar.db.RestoreRangeDatabaseAccessor;
+import org.apache.cassandra.sidecar.restore.RestoreJobDiscoverer;
+import org.apache.cassandra.sidecar.restore.RestoreJobTestUtils;
+import org.apache.cassandra.sidecar.testing.QualifiedName;
+import
org.apache.cassandra.sidecar.testing.SharedClusterSidecarIntegrationTestBase;
+import org.apache.cassandra.sidecar.utils.SimpleCassandraVersion;
+import org.apache.cassandra.testing.ClusterBuilderConfiguration;
+
+import static
org.apache.cassandra.sidecar.restore.RestoreJobTestUtils.createJob;
+import static
org.apache.cassandra.sidecar.restore.RestoreJobTestUtils.disableRestoreProcessor;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
+
+/**
+ * Integration tests verifying that SAI import options (failOnMissingIndex,
validateIndexChecksum) are
+ * correctly persisted and propagated through the restore job pipeline: create
job -> persist -> discover -> create ranges.
+ */
+class RestoreJobDiscovererSAIOptionsIntTest extends
SharedClusterSidecarIntegrationTestBase
+{
+ private static final SimpleCassandraVersion MIN_VERSION_WITH_SAI =
SimpleCassandraVersion.create("5.0.0");
+ private static final String TEST_KEYSPACE = "sai_import_options_ks";
+ private static final QualifiedName TABLE_NAME = new
QualifiedName(TEST_KEYSPACE, "test_table");
+
+ @Override
+ protected void beforeClusterProvisioning()
+ {
+ SimpleCassandraVersion version =
SimpleCassandraVersion.create(testVersion.version());
+ assumeThat(version)
+ .as("SAI indexes are only available in Cassandra 5.0 and later")
+ .isGreaterThanOrEqualTo(MIN_VERSION_WITH_SAI);
+ }
+
+ @Override
+ protected ClusterBuilderConfiguration testClusterConfiguration()
+ {
+ return super.testClusterConfiguration()
+ .nodesPerDc(3)
+ .tokenSupplier(nodeIndex ->
+ Collections.singletonList(String.valueOf(new long[]{
0, 1000L, 2000L }[nodeIndex - 1])));
+ }
+
+ @Override
+ protected void startSidecar(ICluster<? extends IInstance> cluster) throws
InterruptedException
+ {
+ serverWrapper = startSidecarWithInstances(List.of(cluster.get(1)),
(AbstractModule) disableRestoreProcessor());
+ }
+
+ @Override
+ protected void initializeSchemaForTest()
+ {
+ createTestKeyspace(TEST_KEYSPACE, Map.of("datacenter1", 2));
+ createTestTable(TABLE_NAME, "CREATE TABLE %s (id text PRIMARY KEY,
name text)");
+ }
+
+ @Override
+ protected void beforeTestStart()
+ {
+ waitForSchemaReady(30, TimeUnit.SECONDS);
+ }
+
+ @Test
+ void testDefaultImportOptions()
+ {
+ QualifiedTableName tableName = new
QualifiedTableName(TABLE_NAME.keyspace(), TABLE_NAME.table());
+ RestoreJobTestUtils.RestoreJobClient testClient =
+ RestoreJobTestUtils.client(trustedClient(), "localhost",
serverWrapper.serverPort);
+
+ // create job with default options
+ UUID jobId = createJob(testClient, tableName);
+
+ RestoreJobDatabaseAccessor jobAccessor =
serverWrapper.injector.getInstance(RestoreJobDatabaseAccessor.class);
+ RestoreJob job = jobAccessor.find(jobId);
+ SSTableImportOptions importOptions = job.importOptions;
+ // verify default SAI options on persisted job
+ assertThat(importOptions.failOnMissingIndex()).isFalse();
+ assertThat(importOptions.validateIndexChecksum()).isFalse();
+
+ short bucketId = 0;
+ CreateSliceRequestPayload slicePayload = new
CreateSliceRequestPayload("sliceId", bucketId, "bucket", "key",
+ "checksum", BigInteger.valueOf(1L), BigInteger.valueOf(1600L),
+ 100L, 100L);
+ testClient.createRestoreSlice(tableName, jobId, slicePayload);
+ testClient.updateRestoreJob(tableName, jobId,
+
UpdateRestoreJobRequestPayload.builder().withStatus(RestoreJobStatus.STAGE_READY).build());
+
+ RestoreJobDiscoverer restoreJobDiscoverer =
serverWrapper.injector.getInstance(RestoreJobDiscoverer.class);
+ restoreJobDiscoverer.tryExecuteDiscovery();
+
+ // verify ranges were created and link to the correct job
+ RestoreRangeDatabaseAccessor rangeDatabaseAccessor =
serverWrapper.injector.getInstance(RestoreRangeDatabaseAccessor.class);
+ List<RestoreRange> ranges = rangeDatabaseAccessor.findAll(jobId,
bucketId);
+ assertThat(ranges).isNotEmpty();
+ for (RestoreRange range : ranges)
+ {
+ assertThat(range.jobId()).isEqualTo(jobId);
+ }
+
+ // Re-read the job after discovery to confirm default importOptions
are preserved
+ RestoreJob jobAfterDiscovery = jobAccessor.find(jobId);
+ assertThat(jobAfterDiscovery.importOptions.failOnMissingIndex())
+ .describedAs("failOnMissingIndex should be false by default
after discovery")
+ .isFalse();
+ assertThat(jobAfterDiscovery.importOptions.validateIndexChecksum())
+ .describedAs("validateIndexChecksum should be false by default
after discovery")
+ .isFalse();
+ }
+
+ @Test
+ void testSaiImportOptionsPropagatedToRestoreRanges()
+ {
+ QualifiedTableName tableName = new
QualifiedTableName(TABLE_NAME.keyspace(), TABLE_NAME.table());
+ RestoreJobTestUtils.RestoreJobClient testClient =
+ RestoreJobTestUtils.client(trustedClient(), "localhost",
serverWrapper.serverPort);
+
+ // create job with SAI options enabled
+ Consumer<CreateRestoreJobRequestPayload.Builder> enableSaiOptions =
builder ->
+ builder.updateImportOptions(opts ->
opts.failOnMissingIndex(true).validateIndexChecksum(true));
+ UUID jobId = createJob(testClient, tableName, enableSaiOptions);
+
+ // verify SAI options are persisted on the job
+ RestoreJobDatabaseAccessor jobAccessor =
serverWrapper.injector.getInstance(RestoreJobDatabaseAccessor.class);
+ RestoreJob job = jobAccessor.find(jobId);
+ SSTableImportOptions importOptions = job.importOptions;
+ assertThat(importOptions.failOnMissingIndex()).isTrue();
+ assertThat(importOptions.validateIndexChecksum()).isTrue();
+ // verify other defaults are preserved
+ assertThat(importOptions.resetLevel()).isTrue();
+ assertThat(importOptions.clearRepaired()).isTrue();
+ assertThat(importOptions.verifySSTables()).isTrue();
+ assertThat(importOptions.verifyTokens()).isTrue();
+ assertThat(importOptions.invalidateCaches()).isTrue();
+ assertThat(importOptions.extendedVerify()).isTrue();
+ assertThat(importOptions.copyData()).isFalse();
+
+ // create slice and discover ranges
+ short bucketId = 0;
+ CreateSliceRequestPayload slicePayload = new
CreateSliceRequestPayload("sliceId", bucketId, "bucket", "key",
+
"checksum", BigInteger.valueOf(1L), BigInteger.valueOf(1600L),
+
100L, 100L);
+ testClient.createRestoreSlice(tableName, jobId, slicePayload);
+ testClient.updateRestoreJob(tableName, jobId,
+
UpdateRestoreJobRequestPayload.builder().withStatus(RestoreJobStatus.STAGE_READY).build());
+
+ RestoreJobDiscoverer restoreJobDiscoverer =
serverWrapper.injector.getInstance(RestoreJobDiscoverer.class);
+ restoreJobDiscoverer.tryExecuteDiscovery();
+
+ // verify ranges were created by the discoverer and link back to the
correct job
+ RestoreRangeDatabaseAccessor rangeDatabaseAccessor =
serverWrapper.injector.getInstance(RestoreRangeDatabaseAccessor.class);
+ List<RestoreRange> ranges = rangeDatabaseAccessor.findAll(jobId,
bucketId);
+ assertThat(ranges).isNotEmpty();
+ for (RestoreRange range : ranges)
+ {
+ assertThat(range.jobId()).isEqualTo(jobId);
+ }
+
+ // Re-read the job after discovery to confirm importOptions are still
intact.
+ RestoreJob jobAfterDiscovery = jobAccessor.find(jobId);
+ assertThat(jobAfterDiscovery.importOptions.failOnMissingIndex())
+ .describedAs("failOnMissingIndex should be true after discovery")
+ .isTrue();
+ assertThat(jobAfterDiscovery.importOptions.validateIndexChecksum())
+ .describedAs("validateIndexChecksum should be true after discovery")
+ .isTrue();
+ }
+}
diff --git
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportWithSaiIntegrationTest.java
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportWithSaiIntegrationTest.java
new file mode 100644
index 00000000..fc485db1
--- /dev/null
+++
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportWithSaiIntegrationTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes.sstableuploads;
+
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.junit.jupiter.api.Test;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.ext.web.client.HttpResponse;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.sidecar.cluster.InstancesMetadata;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.testing.QualifiedName;
+import
org.apache.cassandra.sidecar.testing.SharedClusterSidecarIntegrationTestBase;
+import org.apache.cassandra.sidecar.utils.SimpleCassandraVersion;
+import org.apache.cassandra.testing.TestUtils;
+
+import static org.apache.cassandra.testing.TestUtils.DC1_RF1;
+import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE;
+import static org.apache.cassandra.testing.TestUtils.TEST_TABLE_PREFIX;
+import static org.apache.cassandra.testing.utils.AssertionUtils.getBlocking;
+import static org.apache.cassandra.testing.utils.AssertionUtils.loopAssert;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
+
+/**
+ * Integration tests for SSTable import with SAI (Storage Attached Index)
parameters.
+ * Validates that SSTables with SAI index metadata can be uploaded and
imported via the Sidecar REST API,
+ * including proper handling of SAI index files and query parameters ({@code
failOnMissingIndex},
+ * {@code validateIndexChecksum}).
+ */
+class SSTableImportWithSaiIntegrationTest extends
SharedClusterSidecarIntegrationTestBase
+{
+ private static final SimpleCassandraVersion MIN_VERSION_WITH_SAI =
SimpleCassandraVersion.create("5.0.0");
+ private static final String WITH_COMPACTION_DISABLED = " WITH COMPACTION =
{\n" +
+ " 'class':
'SizeTieredCompactionStrategy', \n" +
+ " 'enabled':
'false' }";
+
+ static final QualifiedName SAI_TABLE =
TestUtils.uniqueTestTableFullName(TEST_KEYSPACE, TEST_TABLE_PREFIX);
+
+ @Override
+ protected void beforeClusterProvisioning()
+ {
+ SimpleCassandraVersion version =
SimpleCassandraVersion.create(testVersion.version());
+ assumeThat(version)
+ .as("SAI indexes are only available in Cassandra 5.0 and later")
+ .isGreaterThanOrEqualTo(MIN_VERSION_WITH_SAI);
+ }
+
+ @Override
+ protected void initializeSchemaForTest()
+ {
+ createTestKeyspace(TEST_KEYSPACE, DC1_RF1);
+ createTestTable(SAI_TABLE,
+ "CREATE TABLE IF NOT EXISTS %s (id text, value text,
PRIMARY KEY(id))"
+ + WITH_COMPACTION_DISABLED + ";");
+
+ cluster.schemaChangeIgnoringStoppedInstances(
+ String.format("CREATE CUSTOM INDEX IF NOT EXISTS %s_sai_idx ON %s
(value) " +
+ "USING
'org.apache.cassandra.index.sai.StorageAttachedIndex';",
+ SAI_TABLE.table(), SAI_TABLE));
+
+ cluster.schemaChangeIgnoringStoppedInstances(
+ String.format("INSERT INTO %s (id, value) VALUES ('a', 'val_a');",
SAI_TABLE));
+ cluster.schemaChangeIgnoringStoppedInstances(
+ String.format("INSERT INTO %s (id, value) VALUES ('b', 'val_b');",
SAI_TABLE));
+ }
+
+ @Test
+ void testSSTableImportWithSaiIndexParams() throws Exception
+ {
+ String snapshotName = SAI_TABLE.table() + "-snapshot";
+
+ // Take snapshot
+ cluster.get(1).nodetoolResult("snapshot",
+ "--tag", snapshotName,
+ "--table", SAI_TABLE.table(),
+ "--", SAI_TABLE.keyspace())
+ .asserts().success();
+
+ // Find snapshot files on the filesystem
+ List<Path> snapshotFiles = findChildFile("127.0.0.1",
SAI_TABLE.keyspace(), snapshotName);
+ assertThat(snapshotFiles).isNotEmpty();
+
+ List<Path> filesToUpload = snapshotFiles.stream()
+ .filter(p ->
p.toFile().isFile())
+ .collect(Collectors.toList());
+ assertThat(filesToUpload).isNotEmpty();
+
+ // SAI index files have '+' in their filename
+ assertThat(filesToUpload.stream().anyMatch(p ->
p.getFileName().toString().contains("+")))
+ .withFailMessage("Expected at least one snapshot file with '+' in its
name (SAI index file)")
+ .isTrue();
+
+ // Upload snapshot files via the REST upload endpoint
+ UUID uploadId = UUID.randomUUID();
+ for (Path path : filesToUpload)
+ {
+ String fileName = path.getFileName().toString();
+ // URLEncoder.encode encodes space as '+', which is correct for
query params but not path segments.
+ // For path segments, '+' is a literal character, so we must
encode it as %2B.
+ String encodedFileName = URLEncoder.encode(fileName,
StandardCharsets.UTF_8)
+ .replace("+", "%2B");
+
+ String uploadRoute = "/api/v1/uploads/" + uploadId + "/keyspaces/"
+ SAI_TABLE.keyspace()
+ + "/tables/" + SAI_TABLE.table() +
"/components/" + encodedFileName;
+ Buffer fileContent = Buffer.buffer(Files.readAllBytes(path));
+ HttpResponse<Buffer> uploadResponse = getBlocking(
+ trustedClient().put(serverWrapper.serverPort, "127.0.0.1",
uploadRoute)
+ .sendBuffer(fileContent));
+ assertThat(uploadResponse.statusCode())
+ .withFailMessage("Upload failed for " + fileName + " with status "
+ uploadResponse.statusCode()
+ + ": " + uploadResponse.bodyAsString())
+ .isEqualTo(HttpResponseStatus.OK.code());
+ }
+
+ // Truncate table and wait until empty
+ cluster.schemaChangeIgnoringStoppedInstances(String.format("TRUNCATE
TABLE %s", SAI_TABLE));
+ loopAssert(30, () -> {
+ Object[][] rows = cluster.getFirstRunningInstance()
+ .coordinator()
+ .execute(String.format("SELECT * FROM
%s", SAI_TABLE),
+ ConsistencyLevel.LOCAL_QUORUM);
+ assertThat(rows == null || rows.length == 0)
+ .withFailMessage("Table should be empty after truncation")
+ .isTrue();
+ });
+
+ // Import with SAI index query params, poll until completed
+ String importRoute = "/api/v1/uploads/" + uploadId + "/keyspaces/" +
SAI_TABLE.keyspace()
+ + "/tables/" + SAI_TABLE.table() + "/import";
+ loopAssert(300, 1000, () -> {
+ HttpResponse<Buffer> importResponse = getBlocking(
+ trustedClient().put(serverWrapper.serverPort, "127.0.0.1",
importRoute)
+ .addQueryParam("failOnMissingIndex", "true")
+ .addQueryParam("validateIndexChecksum", "true")
+ .send());
+
assertThat(importResponse.statusCode()).isEqualTo(HttpResponseStatus.OK.code());
+ });
+
+ // Verify SAI index component files exist on the filesystem.
+ // No new data was written after truncate, so any SAI files must have
come from the imported snapshot.
+ InstanceMetadata instance =
serverWrapper.injector.getInstance(InstancesMetadata.class)
+
.instanceFromHost("127.0.0.1");
+ String dataDir = instance.dataDirs().get(0);
+ Path keyspacePath = Paths.get(dataDir, SAI_TABLE.keyspace());
+ Path tableDir;
+ try (Stream<Path> dirs = Files.list(keyspacePath))
+ {
+ tableDir = dirs.filter(dir ->
dir.getFileName().toString().startsWith(SAI_TABLE.table()))
+ .findFirst()
+ .orElseThrow(() -> new AssertionError("Table
directory not found"));
+ }
+
+ List<Path> saiFiles;
+ try (Stream<Path> files = Files.list(tableDir))
+ {
+ saiFiles = files.filter(f ->
f.getFileName().toString().toUpperCase().contains("SAI"))
+ .collect(Collectors.toList());
+ }
+ assertThat(saiFiles)
+ .withFailMessage("Expected SAI index component files on the filesystem
after import")
+ .isNotEmpty();
+
+ // Verify imported data is present
+ assertThat(queryIds()).containsExactlyInAnyOrder("a", "b");
+
+ // Add new data after import
+ cluster.schemaChangeIgnoringStoppedInstances(
+ String.format("INSERT INTO %s (id, value) VALUES ('c', 'val_c');",
SAI_TABLE));
+ cluster.schemaChangeIgnoringStoppedInstances(
+ String.format("INSERT INTO %s (id, value) VALUES ('d', 'val_d');",
SAI_TABLE));
+
+ // Verify all data (imported + new) is present
+ assertThat(queryIds()).containsExactlyInAnyOrder("a", "b", "c", "d");
+ }
+
+ private List<String> queryIds()
+ {
+ Object[][] rows = cluster.getFirstRunningInstance()
+ .coordinator()
+ .execute(String.format("SELECT id FROM %s",
SAI_TABLE),
+ ConsistencyLevel.LOCAL_QUORUM);
+ return Stream.of(rows)
+ .map(row -> (String) row[0])
+ .collect(Collectors.toList());
+ }
+}
diff --git
a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/TableOperations.java
b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/TableOperations.java
index 3d1570ae..03be9ac5 100644
---
a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/TableOperations.java
+++
b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/TableOperations.java
@@ -55,6 +55,38 @@ public interface TableOperations
boolean extendedVerify,
boolean copyData);
+ /**
+ * Load new SSTables from the given {@code directory} with SAI index
validation options.
+ * Defaults to ignoring SAI flags for Cassandra versions that do not
support them.
+ *
+ * @param keyspace the keyspace in Cassandra
+ * @param tableName the table name in Cassandra
+ * @param directory the directory to the new SSTables
+ * @param resetLevel if the level should be reset to 0 on the new
SSTables
+ * @param clearRepaired if repaired info should be wiped from the new
SSTables
+ * @param verifySSTables if the new SSTables should be verified that
they are not corrupt
+ * @param verifyTokens if the tokens in the new SSTables should be
verified that they are owned by the
+ * current node
+ * @param invalidateCaches if row cache should be invalidated for the
keys in the new SSTables
+ * @param extendedVerify if we should run an extended verify checking
all values in the new SSTables
+ * @param copyData if we should copy data from source paths
instead of moving them
+ * @param failOnMissingIndex if SAI indexes should be validated during
import
+ * @param validateIndexChecksum if SAI index checksums should be
verified during import
+ * @return list of failed import directories
+ */
+ List<String> importNewSSTables(@NotNull String keyspace,
+ @NotNull String tableName,
+ @NotNull String directory,
+ boolean resetLevel,
+ boolean clearRepaired,
+ boolean verifySSTables,
+ boolean verifyTokens,
+ boolean invalidateCaches,
+ boolean extendedVerify,
+ boolean copyData,
+ boolean failOnMissingIndex,
+ boolean validateIndexChecksum);
+
/**
* Returns a list of data directories for the given {@code table}.
*
diff --git a/server/build.gradle b/server/build.gradle
index 9cf929a2..8b2ae702 100644
--- a/server/build.gradle
+++ b/server/build.gradle
@@ -186,6 +186,8 @@ dependencies {
testFixturesApi(testFixtures(project(":server-common")))
testFixturesApi(testFixtures(project(":vertx-auth-mtls")))
+ testFixturesApi(testFixtures(project(":client-common")))
+ testFixturesApi(testFixtures(project(":test-common")))
testFixturesImplementation("io.vertx:vertx-junit5:${project.vertxVersion}")
testFixturesImplementation("com.google.inject:guice:${guiceVersion}")
testFixturesImplementation('org.mockito:mockito-core:4.10.0')
@@ -195,6 +197,8 @@ dependencies {
testFixturesImplementation("io.vertx:vertx-web:${project.vertxVersion}") {
exclude group: 'junit', module: 'junit'
}
+
testFixturesImplementation("io.vertx:vertx-web-client:${project.vertxVersion}")
+
testFixturesImplementation('com.datastax.cassandra:cassandra-driver-core:3.11.3')
integrationTestImplementation(testFixtures(project(":test-common")))
integrationTestImplementation
"org.junit.jupiter:junit-jupiter-api:${project.junitVersion}"
// Needed by the Cassandra dtest framework
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/CassandraInputValidationConfigurationImpl.java
b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/CassandraInputValidationConfigurationImpl.java
index db8f4218..7fa29fca 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/CassandraInputValidationConfigurationImpl.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/CassandraInputValidationConfigurationImpl.java
@@ -53,10 +53,12 @@ public class CassandraInputValidationConfigurationImpl
implements CassandraInput
public static final String DEFAULT_ALLOWED_CHARS_FOR_QUOTED_NAME =
"[a-zA-Z_0-9]{1,48}";
public static final String ALLOWED_CHARS_FOR_COMPONENT_NAME_PROPERTY =
"allowed_chars_for_component_name";
public static final String DEFAULT_ALLOWED_CHARS_FOR_COMPONENT_NAME =
- "[a-zA-Z0-9_-]+(\\.db|\\.cql|\\.json|\\.crc32|TOC\\.txt)";
+ "[a-zA-Z0-9_+\\-]+(\\.db|\\.cql|\\.json|\\.crc32|TOC\\.txt)";
public static final String
ALLOWED_CHARS_FOR_RESTRICTED_COMPONENT_NAME_PROPERTY =
"allowed_chars_for_restricted_component_name";
- public static final String
DEFAULT_ALLOWED_CHARS_FOR_RESTRICTED_COMPONENT_NAME =
"[a-zA-Z0-9_-]+(\\.db|TOC\\.txt)";
+ // '+' is included in the allowed characters to support downloading SAI
files in snapshots
+ public static final String
DEFAULT_ALLOWED_CHARS_FOR_RESTRICTED_COMPONENT_NAME =
+ "[a-zA-Z0-9_+\\-]+(\\.db|TOC\\.txt)";
@JsonProperty(value = VALIDATOR_PROPERTY)
protected final ParameterizedClassConfiguration validatorConfiguration;
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/data/SSTableImportRequestParam.java
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/data/SSTableImportRequestParam.java
index 280a6daa..80f548cf 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/data/SSTableImportRequestParam.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/data/SSTableImportRequestParam.java
@@ -22,6 +22,7 @@ import java.util.Objects;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.common.data.SSTableImportOptions;
import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName;
import static
org.apache.cassandra.sidecar.utils.RequestUtils.parseBooleanQueryParam;
@@ -39,6 +40,8 @@ public class SSTableImportRequestParam extends SSTableUploads
private final boolean invalidateCaches;
private final boolean extendedVerify;
private final boolean copyData;
+ private final boolean failOnMissingIndex;
+ private final boolean validateIndexChecksum;
/**
* Constructs an SSTableImportRequest
@@ -53,10 +56,13 @@ public class SSTableImportRequestParam extends
SSTableUploads
* @param invalidateCaches if row cache should be invalidated for the
keys in the new SSTables
* @param extendedVerify if we should run an extended verify checking
all values in the new SSTables
* @param copyData if we should copy data from source paths
instead of moving them
+ * @param failOnMissingIndex if SAI indexes should be validated during
import
+ * @param validateIndexChecksum if SAI index checksums should be
verified during import
*/
public SSTableImportRequestParam(QualifiedTableName qualifiedTableName,
String uploadId, boolean resetLevel,
boolean clearRepaired, boolean
verifySSTables, boolean verifyTokens,
- boolean invalidateCaches, boolean
extendedVerify, boolean copyData)
+ boolean invalidateCaches, boolean
extendedVerify, boolean copyData,
+ boolean failOnMissingIndex, boolean
validateIndexChecksum)
{
super(qualifiedTableName, uploadId);
this.resetLevel = resetLevel;
@@ -66,6 +72,8 @@ public class SSTableImportRequestParam extends SSTableUploads
this.invalidateCaches = invalidateCaches;
this.extendedVerify = extendedVerify;
this.copyData = copyData;
+ this.failOnMissingIndex = failOnMissingIndex;
+ this.validateIndexChecksum = validateIndexChecksum;
}
/**
@@ -125,6 +133,22 @@ public class SSTableImportRequestParam extends
SSTableUploads
return copyData;
}
+ /**
+ * @return true if SAI indexes should be validated during import, false
otherwise
+ */
+ public boolean failOnMissingIndex()
+ {
+ return failOnMissingIndex;
+ }
+
+ /**
+ * @return true if SAI index checksums should be verified during import,
false otherwise
+ */
+ public boolean validateIndexChecksum()
+ {
+ return validateIndexChecksum;
+ }
+
/**
* {@inheritDoc}
*/
@@ -140,6 +164,8 @@ public class SSTableImportRequestParam extends
SSTableUploads
&& invalidateCaches == that.invalidateCaches
&& extendedVerify == that.extendedVerify
&& copyData == that.copyData
+ && failOnMissingIndex == that.failOnMissingIndex
+ && validateIndexChecksum == that.validateIndexChecksum
&& uploadId().equals(that.uploadId())
&& keyspace().equals(that.keyspace())
&& table().equals(that.table());
@@ -151,7 +177,8 @@ public class SSTableImportRequestParam extends
SSTableUploads
public int hashCode()
{
return Objects.hash(uploadId(), keyspace(), table(), resetLevel,
clearRepaired, verifySSTables,
- verifyTokens, invalidateCaches, extendedVerify,
copyData);
+ verifyTokens, invalidateCaches, extendedVerify,
copyData,
+ failOnMissingIndex, validateIndexChecksum);
}
/**
@@ -170,6 +197,8 @@ public class SSTableImportRequestParam extends
SSTableUploads
", invalidateCaches=" + invalidateCaches +
", extendedVerify=" + extendedVerify +
", copyData=" + copyData +
+ ", failOnMissingIndex=" + failOnMissingIndex +
+ ", validateIndexChecksum=" + validateIndexChecksum +
'}';
}
@@ -191,6 +220,10 @@ public class SSTableImportRequestParam extends
SSTableUploads
parseBooleanQueryParam(request,
"verifyTokens", true),
parseBooleanQueryParam(request,
"invalidateCaches", true),
parseBooleanQueryParam(request,
"extendedVerify", true),
- parseBooleanQueryParam(request,
"copyData", false));
+ parseBooleanQueryParam(request,
"copyData", false),
+ parseBooleanQueryParam(request,
"failOnMissingIndex",
+
SSTableImportOptions.DEFAULT_FAIL_ON_MISSING_INDEX),
+ parseBooleanQueryParam(request,
"validateIndexChecksum",
+
SSTableImportOptions.DEFAULT_VALIDATE_INDEX_CHECKSUM));
}
}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/sstableuploads/SSTableImportHandler.java
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/sstableuploads/SSTableImportHandler.java
index 13493029..bf00b238 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/sstableuploads/SSTableImportHandler.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/sstableuploads/SSTableImportHandler.java
@@ -208,6 +208,8 @@ public class SSTableImportHandler extends
AbstractHandler<SSTableImportRequestPa
.invalidateCaches(request.invalidateCaches())
.extendedVerify(request.extendedVerify())
.copyData(request.copyData())
+ .failOnMissingIndex(request.failOnMissingIndex())
+ .validateIndexChecksum(request.validateIndexChecksum())
.build();
}
}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreRangeTask.java
b/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreRangeTask.java
index 544cfaec..02f8cc52 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreRangeTask.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreRangeTask.java
@@ -580,6 +580,8 @@ public class RestoreRangeTask implements RestoreRangeHandler
.invalidateCaches(options.invalidateCaches())
.extendedVerify(options.extendedVerify())
.copyData(options.copyData())
+
.failOnMissingIndex(options.failOnMissingIndex())
+
.validateIndexChecksum(options.validateIndexChecksum())
.uploadId(range.uploadId())
.build();
Future<Void> future = importer.scheduleImport(importOptions)
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/utils/FastCassandraInputValidator.java
b/server/src/main/java/org/apache/cassandra/sidecar/utils/FastCassandraInputValidator.java
index 99a0f70b..c01038c1 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/utils/FastCassandraInputValidator.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/utils/FastCassandraInputValidator.java
@@ -254,7 +254,7 @@ public class FastCassandraInputValidator extends
RegexBasedCassandraInputValidat
*/
protected boolean isValidComponentNameCharacter(char c)
{
- return isAlphanumeric(c) || isUnderscore(c) || isDash(c);
+ return isAlphanumeric(c) || isUnderscore(c) || isDash(c) || isPlus(c);
}
/**
@@ -293,6 +293,15 @@ public class FastCassandraInputValidator extends
RegexBasedCassandraInputValidat
return c == '-';
}
+ /**
+ * @param c the character to test
+ * @return {@code true} if the input {@code c} is a plus sign, {@code
false} otherwise
+ */
+ protected boolean isPlus(char c)
+ {
+ return c == '+';
+ }
+
/**
* @param configMap the configuration map
* @param key the key in the map
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java
b/server/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java
index e0d0cbde..31c095d8 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java
@@ -39,6 +39,7 @@ import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.ext.web.handler.HttpException;
import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.data.SSTableImportOptions;
import org.apache.cassandra.sidecar.common.server.TableOperations;
import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
import org.apache.cassandra.sidecar.config.ServiceConfiguration;
@@ -64,6 +65,8 @@ public class SSTableImporter
public static final boolean DEFAULT_INVALIDATE_CACHES = true;
public static final boolean DEFAULT_EXTENDED_VERIFY = true;
public static final boolean DEFAULT_COPY_DATA = false;
+ public static final boolean DEFAULT_FAIL_ON_MISSING_INDEX =
SSTableImportOptions.DEFAULT_FAIL_ON_MISSING_INDEX;
+ public static final boolean DEFAULT_VALIDATE_INDEX_CHECKSUM =
SSTableImportOptions.DEFAULT_VALIDATE_INDEX_CHECKSUM;
private final Vertx vertx;
private final ExecutorPools executorPools;
private final InstanceMetadataFetcher metadataFetcher;
@@ -231,7 +234,9 @@ public class SSTableImporter
options.verifyTokens,
options.invalidateCaches,
options.extendedVerify,
- options.copyData);
+ options.copyData,
+ options.failOnMissingIndex,
+
options.validateIndexChecksum);
long serviceTimeNanos = System.nanoTime() - startTime;
if (!failedDirectories.isEmpty())
{
@@ -361,6 +366,8 @@ public class SSTableImporter
final boolean invalidateCaches;
final boolean extendedVerify;
final boolean copyData;
+ final boolean failOnMissingIndex;
+ final boolean validateIndexChecksum;
private ImportOptions(Builder builder)
{
@@ -376,6 +383,8 @@ public class SSTableImporter
invalidateCaches = builder.invalidateCaches;
extendedVerify = builder.extendedVerify;
copyData = builder.copyData;
+ failOnMissingIndex = builder.failOnMissingIndex;
+ validateIndexChecksum = builder.validateIndexChecksum;
}
/**
@@ -417,6 +426,8 @@ public class SSTableImporter
&& invalidateCaches == options.invalidateCaches
&& extendedVerify == options.extendedVerify
&& copyData == options.copyData
+ && failOnMissingIndex == options.failOnMissingIndex
+ && validateIndexChecksum == options.validateIndexChecksum
&& host.equals(options.host)
&& keyspace.equals(options.keyspace)
&& tableName.equals(options.tableName)
@@ -430,7 +441,8 @@ public class SSTableImporter
public int hashCode()
{
return Objects.hash(host, keyspace, tableName, directory,
uploadId, resetLevel, clearRepaired,
- verifySSTables, verifyTokens,
invalidateCaches, extendedVerify, copyData);
+ verifySSTables, verifyTokens,
invalidateCaches, extendedVerify, copyData,
+ failOnMissingIndex, validateIndexChecksum);
}
/**
@@ -451,6 +463,8 @@ public class SSTableImporter
", invalidateCaches=" + invalidateCaches +
", extendedVerify=" + extendedVerify +
", copyData=" + copyData +
+ ", failOnMissingIndex=" + failOnMissingIndex +
+ ", validateIndexChecksum=" + validateIndexChecksum +
'}';
}
@@ -471,6 +485,8 @@ public class SSTableImporter
private boolean invalidateCaches = DEFAULT_INVALIDATE_CACHES;
private boolean extendedVerify = DEFAULT_EXTENDED_VERIFY;
private boolean copyData = DEFAULT_COPY_DATA;
+ private boolean failOnMissingIndex = DEFAULT_FAIL_ON_MISSING_INDEX;
+ private boolean validateIndexChecksum =
DEFAULT_VALIDATE_INDEX_CHECKSUM;
/**
* Sets the {@code host} and returns a reference to this Builder
enabling method chaining.
@@ -616,6 +632,30 @@ public class SSTableImporter
return this;
}
+ /**
+ * Sets the {@code failOnMissingIndex} and returns a reference to
this Builder enabling method chaining.
+ *
+ * @param failOnMissingIndex the {@code failOnMissingIndex} to set
+ * @return a reference to this Builder
+ */
+ public Builder failOnMissingIndex(boolean failOnMissingIndex)
+ {
+ this.failOnMissingIndex = failOnMissingIndex;
+ return this;
+ }
+
+ /**
+ * Sets the {@code validateIndexChecksum} and returns a reference
to this Builder enabling method chaining.
+ *
+ * @param validateIndexChecksum the {@code validateIndexChecksum}
to set
+ * @return a reference to this Builder
+ */
+ public Builder validateIndexChecksum(boolean validateIndexChecksum)
+ {
+ this.validateIndexChecksum = validateIndexChecksum;
+ return this;
+ }
+
/**
* Returns a {@code ImportOptions} built from the parameters
previously set.
*
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/handlers/sstableuploads/SSTableImportHandlerTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/sstableuploads/SSTableImportHandlerTest.java
index 8d06dc09..7e505fda 100644
---
a/server/src/test/java/org/apache/cassandra/sidecar/handlers/sstableuploads/SSTableImportHandlerTest.java
+++
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/sstableuploads/SSTableImportHandlerTest.java
@@ -44,7 +44,9 @@ import
org.apache.cassandra.sidecar.metrics.instance.InstanceMetrics;
import org.apache.cassandra.sidecar.metrics.instance.InstanceMetricsImpl;
import static
org.apache.cassandra.sidecar.utils.SSTableImporter.DEFAULT_COPY_DATA;
+import static
org.apache.cassandra.sidecar.utils.SSTableImporter.DEFAULT_FAIL_ON_MISSING_INDEX;
import static
org.apache.cassandra.sidecar.utils.SSTableImporter.DEFAULT_INVALIDATE_CACHES;
+import static
org.apache.cassandra.sidecar.utils.SSTableImporter.DEFAULT_VALIDATE_INDEX_CHECKSUM;
import static
org.apache.cassandra.sidecar.utils.SSTableImporter.DEFAULT_VERIFY_TOKENS;
import static org.apache.cassandra.sidecar.utils.TestMetricUtils.registry;
import static org.assertj.core.api.Assertions.assertThat;
@@ -140,7 +142,8 @@ public class SSTableImportHandlerTest extends
BaseUploadsHandlerTest
when(mockCFOperations.importNewSSTables("ks", "table",
stageDirectoryAbsolutePath,
true, true, true,
DEFAULT_VERIFY_TOKENS,
DEFAULT_INVALIDATE_CACHES, true,
- DEFAULT_COPY_DATA))
+ DEFAULT_COPY_DATA,
+ DEFAULT_FAIL_ON_MISSING_INDEX,
DEFAULT_VALIDATE_INDEX_CHECKSUM))
.thenReturn(Collections.singletonList(stageDirectoryAbsolutePath));
String requestURI = "/api/v1/uploads/" + uploadId +
"/keyspaces/ks/tables/table/import";
@@ -158,7 +161,8 @@ public class SSTableImportHandlerTest extends
BaseUploadsHandlerTest
when(mockCFOperations.importNewSSTables("ks", "table",
stageDirectoryAbsolutePath,
false, true, true,
DEFAULT_VERIFY_TOKENS,
DEFAULT_INVALIDATE_CACHES, true,
- DEFAULT_COPY_DATA))
+ DEFAULT_COPY_DATA,
+ DEFAULT_FAIL_ON_MISSING_INDEX,
DEFAULT_VALIDATE_INDEX_CHECKSUM))
.thenReturn(Collections.emptyList());
String requestURI = "/api/v1/uploads/" + uploadId +
"/keyspaces/ks/tables/table/import";
@@ -181,7 +185,8 @@ public class SSTableImportHandlerTest extends
BaseUploadsHandlerTest
when(mockCFOperations.importNewSSTables("ks", "table",
stageDirectoryAbsolutePath,
false, true, true,
DEFAULT_VERIFY_TOKENS,
DEFAULT_INVALIDATE_CACHES, true,
- DEFAULT_COPY_DATA))
+ DEFAULT_COPY_DATA,
+ DEFAULT_FAIL_ON_MISSING_INDEX,
DEFAULT_VALIDATE_INDEX_CHECKSUM))
.thenReturn(Collections.emptyList());
String requestURI = "/api/v1/uploads/" + uploadId +
"/keyspaces/ks/tables/table/import";
@@ -199,7 +204,9 @@ public class SSTableImportHandlerTest extends
BaseUploadsHandlerTest
DEFAULT_VERIFY_TOKENS,
DEFAULT_INVALIDATE_CACHES,
true,
-
DEFAULT_COPY_DATA);
+
DEFAULT_COPY_DATA,
+
DEFAULT_FAIL_ON_MISSING_INDEX,
+
DEFAULT_VALIDATE_INDEX_CHECKSUM);
context.completeNow();
})));
assertThat(context.awaitCompletion(30, TimeUnit.SECONDS)).isTrue();
@@ -214,7 +221,8 @@ public class SSTableImportHandlerTest extends
BaseUploadsHandlerTest
when(mockCFOperations.importNewSSTables("ks", "table",
stageDirectoryAbsolutePath,
true, false, true,
DEFAULT_VERIFY_TOKENS,
DEFAULT_INVALIDATE_CACHES, true,
- DEFAULT_COPY_DATA))
+ DEFAULT_COPY_DATA,
+ DEFAULT_FAIL_ON_MISSING_INDEX,
DEFAULT_VALIDATE_INDEX_CHECKSUM))
.thenReturn(Collections.emptyList());
String requestURI = "/api/v1/uploads/" + uploadId +
"/keyspaces/ks/tables/table/import";
@@ -232,7 +240,9 @@ public class SSTableImportHandlerTest extends
BaseUploadsHandlerTest
DEFAULT_VERIFY_TOKENS,
DEFAULT_INVALIDATE_CACHES,
true,
-
DEFAULT_COPY_DATA);
+
DEFAULT_COPY_DATA,
+
DEFAULT_FAIL_ON_MISSING_INDEX,
+
DEFAULT_VALIDATE_INDEX_CHECKSUM);
context.completeNow();
})));
assertThat(context.awaitCompletion(30, TimeUnit.SECONDS)).isTrue();
@@ -247,7 +257,8 @@ public class SSTableImportHandlerTest extends
BaseUploadsHandlerTest
when(mockCFOperations.importNewSSTables("ks", "table",
stageDirectoryAbsolutePath,
true, true, false,
DEFAULT_VERIFY_TOKENS,
DEFAULT_INVALIDATE_CACHES, true,
- DEFAULT_COPY_DATA))
+ DEFAULT_COPY_DATA,
+ DEFAULT_FAIL_ON_MISSING_INDEX,
DEFAULT_VALIDATE_INDEX_CHECKSUM))
.thenReturn(Collections.emptyList());
String requestURI = "/api/v1/uploads/" + uploadId +
"/keyspaces/ks/tables/table/import";
@@ -265,7 +276,9 @@ public class SSTableImportHandlerTest extends
BaseUploadsHandlerTest
DEFAULT_VERIFY_TOKENS,
DEFAULT_INVALIDATE_CACHES,
true,
-
DEFAULT_COPY_DATA);
+
DEFAULT_COPY_DATA,
+
DEFAULT_FAIL_ON_MISSING_INDEX,
+
DEFAULT_VALIDATE_INDEX_CHECKSUM);
context.completeNow();
})));
assertThat(context.awaitCompletion(30, TimeUnit.SECONDS)).isTrue();
@@ -280,7 +293,8 @@ public class SSTableImportHandlerTest extends
BaseUploadsHandlerTest
when(mockCFOperations.importNewSSTables("ks", "table",
stageDirectoryAbsolutePath,
true, true, true,
DEFAULT_VERIFY_TOKENS,
DEFAULT_INVALIDATE_CACHES, false,
-
DEFAULT_COPY_DATA)).thenReturn(Collections.emptyList());
+ DEFAULT_COPY_DATA,
+ DEFAULT_FAIL_ON_MISSING_INDEX,
DEFAULT_VALIDATE_INDEX_CHECKSUM)).thenReturn(Collections.emptyList());
String requestURI = "/api/v1/uploads/" + uploadId +
"/keyspaces/ks/tables/table/import";
sendRequest(context,
@@ -297,7 +311,9 @@ public class SSTableImportHandlerTest extends
BaseUploadsHandlerTest
DEFAULT_VERIFY_TOKENS,
DEFAULT_INVALIDATE_CACHES,
false,
-
DEFAULT_COPY_DATA);
+
DEFAULT_COPY_DATA,
+
DEFAULT_FAIL_ON_MISSING_INDEX,
+
DEFAULT_VALIDATE_INDEX_CHECKSUM);
context.completeNow();
})));
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/restore/RestoreRangeTaskTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/restore/RestoreRangeTaskTest.java
index ed799477..4a8a170c 100644
---
a/server/src/test/java/org/apache/cassandra/sidecar/restore/RestoreRangeTaskTest.java
+++
b/server/src/test/java/org/apache/cassandra/sidecar/restore/RestoreRangeTaskTest.java
@@ -53,6 +53,7 @@ import
org.apache.cassandra.sidecar.cluster.locator.LocalTokenRangesProvider;
import org.apache.cassandra.sidecar.common.ResourceUtils;
import org.apache.cassandra.sidecar.common.data.ConsistencyLevel;
import org.apache.cassandra.sidecar.common.data.RestoreJobStatus;
+import org.apache.cassandra.sidecar.common.data.SSTableImportOptions;
import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange;
import org.apache.cassandra.sidecar.common.server.utils.ThrowableUtils;
import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
@@ -75,6 +76,7 @@ import
org.apache.cassandra.sidecar.restore.RestoreSliceManifest.ManifestEntry;
import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
import org.apache.cassandra.sidecar.utils.SSTableImporter;
import org.apache.cassandra.sidecar.utils.XXHash32Provider;
+import org.mockito.ArgumentCaptor;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
@@ -358,6 +360,44 @@ class RestoreRangeTaskTest
assertThat(mockRange.hasImported()).isFalse();
}
+ @Test
+ void testCommitPassesSaiOptionsToImporter(@TempDir Path testFolder)
+ {
+ SSTableImportOptions customOptions = SSTableImportOptions.defaults()
+
.failOnMissingIndex(true)
+
.validateIndexChecksum(true);
+ RestoreJob job = RestoreJobTest.createTestingJob(UUIDs.timeBased(),
RestoreJobStatus.IMPORT_READY, ConsistencyLevel.QUORUM)
+ .unbuild()
+ .sstableImportOptions(customOptions)
+ .build();
+
when(mockSSTableImporter.scheduleImport(any())).thenReturn(Future.succeededFuture());
+ RestoreRangeTask task = createTask(mockRange, job);
+ Future<?> result = task.commit(testFolder.toFile());
+ assertThat(result.failed()).isFalse();
+
+ ArgumentCaptor<SSTableImporter.ImportOptions> captor =
ArgumentCaptor.forClass(SSTableImporter.ImportOptions.class);
+ verify(mockSSTableImporter).scheduleImport(captor.capture());
+ SSTableImporter.ImportOptions captured = captor.getValue();
+
+ SSTableImporter.ImportOptions expected = new
SSTableImporter.ImportOptions.Builder()
+
.host(mockRange.owner().host())
+
.keyspace(mockRange.keyspace())
+ .tableName(mockRange.table())
+
.directory(testFolder.toFile().toString())
+
.resetLevel(customOptions.resetLevel())
+
.clearRepaired(customOptions.clearRepaired())
+
.verifySSTables(customOptions.verifySSTables())
+
.verifyTokens(customOptions.verifyTokens())
+
.invalidateCaches(customOptions.invalidateCaches())
+
.extendedVerify(customOptions.extendedVerify())
+
.copyData(customOptions.copyData())
+
.failOnMissingIndex(customOptions.failOnMissingIndex())
+
.validateIndexChecksum(customOptions.validateIndexChecksum())
+
.uploadId(mockRange.uploadId())
+ .build();
+ assertThat(captured).isEqualTo(expected);
+ }
+
@Test
void testHandlingUnexpectedExceptionInObjectExistsCheck(@TempDir Path
testFolder)
{
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/utils/SSTableImporterTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/utils/SSTableImporterTest.java
index 95a0ee44..cd7cdc76 100644
---
a/server/src/test/java/org/apache/cassandra/sidecar/utils/SSTableImporterTest.java
+++
b/server/src/test/java/org/apache/cassandra/sidecar/utils/SSTableImporterTest.java
@@ -98,14 +98,17 @@ class SSTableImporterTest
when(mockMetadataFetcher.instance("127.0.0.3")).thenReturn(mockInstanceMetadata3);
when(mockCassandraAdapterDelegate1.tableOperations()).thenReturn(mockTableOperations1);
when(mockTableOperations1.importNewSSTables("ks", "tbl", "/dir", true,
true,
- true, true, true, true,
false))
+ true, true, true, true,
false, false, false))
+ .thenReturn(Collections.emptyList());
+ when(mockTableOperations1.importNewSSTables("ks2", "tbl", "/dir",
true, true,
+ true, true, true, true,
false, false, false))
.thenReturn(Collections.emptyList());
when(mockTableOperations1.importNewSSTables("ks", "tbl",
"/failed-dir", true, true,
- true, true, true, true,
false))
+ true, true, true, true,
false, false, false))
.thenReturn(Collections.singletonList("/failed-dir"));
when(mockCassandraAdapterDelegate2.tableOperations()).thenReturn(mockTableOperations2);
when(mockTableOperations2.importNewSSTables("ks", "tbl", "/dir", true,
true,
- true, true, true, true,
false))
+ true, true, true, true,
false, false, false))
.thenThrow(new RuntimeException("Exception during import"));
when(mockCassandraAdapterDelegate3.tableOperations()).thenThrow(new
CassandraUnavailableException(CQL_AND_JMX, "Cassandra unavailable"));
executorPools = new ExecutorPools(vertx, serviceConfiguration);
@@ -154,7 +157,7 @@ class SSTableImporterTest
assertThat(queue).isEmpty();
}
verify(mockTableOperations1, times(1))
- .importNewSSTables("ks", "tbl", "/dir", true, true, true, true,
true, true, false);
+ .importNewSSTables("ks", "tbl", "/dir", true, true, true, true,
true, true, false, false, false);
vertx.setTimer(100, handle -> {
// after successful import, the queue must be drained
assertThat(instanceMetrics(1).sstableImport().pendingImports.metric.getValue()).isZero();
@@ -354,9 +357,9 @@ class SSTableImporterTest
assertThat(queue).isEmpty();
}
verify(mockTableOperations1, times(1))
- .importNewSSTables("ks", "tbl", "/dir", true, true, true,
true, true, true, false);
+ .importNewSSTables("ks", "tbl", "/dir", true, true, true,
true, true, true, false, false, false);
verify(mockTableOperations1, times(1))
- .importNewSSTables("ks2", "tbl", "/dir", true, true, true,
true, true, true, false);
+ .importNewSSTables("ks2", "tbl", "/dir", true, true, true,
true, true, true, false, false, false);
vertx.setTimer(100, handle -> {
// after successful import, the queue must be drained
assertThat(instanceMetrics(1).sstableImport().pendingImports.metric.getValue()).isZero();
diff --git
a/server/src/test/integration/org/apache/cassandra/sidecar/restore/RestoreJobTestUtils.java
b/server/src/testFixtures/java/org/apache/cassandra/sidecar/restore/RestoreJobTestUtils.java
similarity index 90%
rename from
server/src/test/integration/org/apache/cassandra/sidecar/restore/RestoreJobTestUtils.java
rename to
server/src/testFixtures/java/org/apache/cassandra/sidecar/restore/RestoreJobTestUtils.java
index cbb35a22..ebd8dbc6 100644
---
a/server/src/test/integration/org/apache/cassandra/sidecar/restore/RestoreJobTestUtils.java
+++
b/server/src/testFixtures/java/org/apache/cassandra/sidecar/restore/RestoreJobTestUtils.java
@@ -86,14 +86,21 @@ public class RestoreJobTestUtils
}
public static UUID createJob(RestoreJobTestUtils.RestoreJobClient
testClient, QualifiedTableName tableName)
+ {
+ return createJob(testClient, tableName, builder -> { });
+ }
+
+ public static UUID createJob(RestoreJobTestUtils.RestoreJobClient
testClient, QualifiedTableName tableName,
+
Consumer<CreateRestoreJobRequestPayload.Builder> customizer)
{
UUID jobId = UUIDs.timeBased();
long expireAt = System.currentTimeMillis() +
TimeUnit.MINUTES.toMillis(2);
- CreateRestoreJobRequestPayload payload = CreateRestoreJobRequestPayload
-
.builder(RestoreJobSecretsGen.genRestoreJobSecrets(), expireAt)
- .jobId(jobId)
-
.consistencyLevel(ConsistencyLevel.LOCAL_QUORUM, "datacenter1").build();
- testClient.createRestoreJob(tableName, payload);
+ CreateRestoreJobRequestPayload.Builder builder =
CreateRestoreJobRequestPayload
+
.builder(RestoreJobSecretsGen.genRestoreJobSecrets(), expireAt)
+ .jobId(jobId)
+
.consistencyLevel(ConsistencyLevel.LOCAL_QUORUM, "datacenter1");
+ customizer.accept(builder);
+ testClient.createRestoreJob(tableName, builder.build());
return jobId;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]