This is an automated email from the ASF dual-hosted git repository.
frankgh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8c20b45 CASSANDRA-19273: Allow setting TTL for snapshots created
8c20b45 is described below
commit 8c20b452dd0728a6fad6d276a7be9fa1b9274495
Author: Saranya Krishnakumar <[email protected]>
AuthorDate: Wed Jan 10 11:49:44 2024 -0800
CASSANDRA-19273: Allow setting TTL for snapshots created
Patch by Saranya Krishnakumar; Reviewed by Yifan Cai, Francisco Guerrero
for CASSANDRA-19273
---
CHANGES.txt | 1 +
DEV-README.md | 12 ++
cassandra-analytics-core/build.gradle | 1 +
.../cassandra/spark/data/CassandraDataLayer.java | 49 ++---
.../apache/cassandra/spark/data/ClientConfig.java | 209 +++++++++++++++++++++
.../spark/data/CassandraDataLayerTests.java | 69 +++++++
.../cassandra/spark/data/ClientConfigTests.java | 135 +++++++++++++
.../apache/cassandra/analytics/SparkTestUtils.java | 2 +-
.../analytics/data/ClearSnapshotTest.java | 176 +++++++++++++++++
.../org/apache/cassandra/spark/utils/MapUtils.java | 11 ++
10 files changed, 642 insertions(+), 23 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 88d8100..a987271 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
1.0.0
+ * Allow setting TTL for snapshots created by Analytics bulk reader
(CASSANDRA-19273)
* Fix range split and use open-closed range notation consistently
(CASSANDRA-19325)
* Add integration tests using in-jvm-dtest to cover blocklisted instances
(CASSANDRA-19272)
* Fix bulk writer consistency level validations for blocked instances
(CASSANDRA-19257)
diff --git a/DEV-README.md b/DEV-README.md
index 2a2f818..2b3bec6 100644
--- a/DEV-README.md
+++ b/DEV-README.md
@@ -89,6 +89,18 @@ To enable git hooks, run the following command at project
root.
git config core.hooksPath githooks
```
+## Running Integration Tests
+
+To run integration tests, build dependencies with instructions under
Dependencies section and configure IP aliases
+needed for integration tests.
+
+### macOS network aliases
+create a temporary alias for every node except the first:
+
+```shell
+ for i in {2..20}; do sudo ifconfig lo0 alias "127.0.0.${i}"; done
+```
+
## IntelliJ
The project is well-supported in IntelliJ.
diff --git a/cassandra-analytics-core/build.gradle
b/cassandra-analytics-core/build.gradle
index 7bc4ed5..9228741 100644
--- a/cassandra-analytics-core/build.gradle
+++ b/cassandra-analytics-core/build.gradle
@@ -70,6 +70,7 @@ project(':cassandra-analytics-core') {
testImplementation("org.junit.jupiter:junit-jupiter-api:${project.junitVersion}")
testImplementation("org.junit.jupiter:junit-jupiter-params:${project.junitVersion}")
testImplementation("org.junit.jupiter:junit-jupiter-engine:${project.junitVersion}")
+ testImplementation("org.assertj:assertj-core:3.24.2")
testImplementation(group: 'org.quicktheories', name: 'quicktheories',
version: "${project.rootProject.quickTheoriesVersion}")
testImplementation(group: 'org.slf4j', name: 'slf4j-simple', version:
'1.7.26')
testImplementation(group: 'org.mockito', name: 'mockito-core',
version: "${project.rootProject.mockitoVersion}")
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
index 675cd1f..5a0ca5a 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
@@ -260,7 +260,7 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements StartupV
// Use create snapshot request to capture instance availability
hint
LOGGER.info("Creating snapshot snapshotName={} keyspace={}
table={} dc={}",
snapshotName, maybeQuotedKeyspace, maybeQuotedTable,
datacenter);
- snapshotFuture = ringFuture.thenCompose(this::createSnapshot);
+ snapshotFuture = ringFuture.thenCompose(ringResponse ->
createSnapshot(options, ringResponse));
}
else
{
@@ -290,8 +290,8 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements StartupV
protected void shutdownHook(ClientConfig options)
{
- // Preserves previous behavior, but we may just want to check for the
clearSnapshot option in the future
- if (options.clearSnapshot())
+ ClientConfig.ClearSnapshotStrategy clearSnapshotStrategy =
options.clearSnapshotStrategy();
+ if (clearSnapshotStrategy.shouldClearOnCompletion())
{
if (options.createSnapshot())
{
@@ -305,6 +305,10 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements StartupV
snapshotName, keyspace, table, datacenter);
}
}
+ else if (clearSnapshotStrategy.hasTTL())
+ {
+ LOGGER.warn("Skipping clearing snapshot because
clearSnapshotStrategy '{}' is used", clearSnapshotStrategy);
+ }
try
{
@@ -316,7 +320,7 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements StartupV
}
}
- private CompletionStage<Map<String, AvailabilityHint>>
createSnapshot(RingResponse ring)
+ private CompletionStage<Map<String, AvailabilityHint>>
createSnapshot(ClientConfig options, RingResponse ring)
{
Map<String, PartitionedDataLayer.AvailabilityHint> availabilityHints =
new ConcurrentHashMap<>(ring.size());
@@ -341,24 +345,25 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements StartupV
LOGGER.info("Creating snapshot on instance snapshotName={}
keyspace={} table={} datacenter={} fqdn={}",
snapshotName, maybeQuotedKeyspace,
maybeQuotedTable, datacenter, ringEntry.fqdn());
SidecarInstance sidecarInstance = new
SidecarInstanceImpl(ringEntry.fqdn(), sidecarClientConfig.effectivePort());
- createSnapshotFuture = sidecar
- .createSnapshot(sidecarInstance,
maybeQuotedKeyspace, maybeQuotedTable, snapshotName)
- .handle((resp, throwable) -> {
- if (throwable == null)
- {
- // Create snapshot succeeded
- return hint;
- }
-
- if (isExhausted(throwable))
- {
- LOGGER.warn("Failed to
create snapshot on instance", throwable);
- return
PartitionedDataLayer.AvailabilityHint.DOWN;
- }
-
- LOGGER.error("Unexpected error
creating snapshot on instance", throwable);
- return
PartitionedDataLayer.AvailabilityHint.UNKNOWN;
- });
+ ClientConfig.ClearSnapshotStrategy clearSnapshotStrategy =
options.clearSnapshotStrategy();
+ createSnapshotFuture =
sidecar.createSnapshot(sidecarInstance, maybeQuotedKeyspace,
+
maybeQuotedTable, snapshotName, clearSnapshotStrategy.ttl())
+ .handle((resp, throwable) ->
{
+ if (throwable == null)
+ {
+ // Create snapshot
succeeded
+ return hint;
+ }
+
+ if
(isExhausted(throwable))
+ {
+ LOGGER.warn("Failed
to create snapshot on instance", throwable);
+ return
PartitionedDataLayer.AvailabilityHint.DOWN;
+ }
+
+ LOGGER.error("Unexpected
error creating snapshot on instance", throwable);
+ return
PartitionedDataLayer.AvailabilityHint.UNKNOWN;
+ });
}
return createSnapshotFuture
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/ClientConfig.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/ClientConfig.java
index d1941ee..5330f1b 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/ClientConfig.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/ClientConfig.java
@@ -24,18 +24,25 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.cassandra.bridge.BigNumberConfigImpl;
import org.apache.cassandra.spark.config.SchemaFeature;
import org.apache.cassandra.spark.config.SchemaFeatureSet;
import org.apache.cassandra.spark.data.partitioner.ConsistencyLevel;
import org.apache.cassandra.spark.utils.MapUtils;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static
org.apache.cassandra.spark.data.CassandraDataLayer.aliasLastModifiedTimestamp;
public final class ClientConfig
{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ClientConfig.class);
+
public static final String SIDECAR_INSTANCES = "sidecar_instances";
public static final String KEYSPACE_KEY = "keyspace";
public static final String TABLE_KEY = "table";
@@ -43,6 +50,19 @@ public final class ClientConfig
public static final String DC_KEY = "dc";
public static final String CREATE_SNAPSHOT_KEY = "createSnapshot";
public static final String CLEAR_SNAPSHOT_KEY = "clearSnapshot";
+ /**
+ * Format of clearSnapshotStrategy is {strategy [snapshotTTLvalue]},
clearSnapshotStrategy holds both the strategy
+ * and in case of TTL based strategy, TTL value. For e.g.
onCompletionOrTTL 2d, TTL 2d, noOp, onCompletion. For
+ * clear snapshot strategies allowed check {@link ClearSnapshotStrategy}
+ */
+ public static final String CLEAR_SNAPSHOT_STRATEGY_KEY =
"clearSnapshotStrategy";
+ /**
+ * TTL value is time to live option for the snapshot (available since
Cassandra 4.1+). TTL value specified must
+ * contain unit along. For e.g. 2d represents a TTL for 2 days; 1h
represents a TTL of 1 hour, etc.
+ * Valid units are {@code d}, {@code h}, {@code s} and {@code m}.
+ */
+ public static final String DEFAULT_SNAPSHOT_TTL_VALUE = "2d";
+ public static final String SNAPSHOT_TTL_PATTERN = "\\d+(d|h|m|s)";
public static final String DEFAULT_PARALLELISM_KEY = "defaultParallelism";
public static final String NUM_CORES_KEY = "numCores";
public static final String CONSISTENCY_LEVEL_KEY = "consistencyLevel";
@@ -67,6 +87,7 @@ public final class ClientConfig
private final String datacenter;
private final boolean createSnapshot;
private final boolean clearSnapshot;
+ private final ClearSnapshotStrategy clearSnapshotStrategy;
private final int defaultParallelism;
private final int numCores;
private final ConsistencyLevel consistencyLevel;
@@ -91,6 +112,11 @@ public final class ClientConfig
this.datacenter = options.get(MapUtils.lowerCaseKey(DC_KEY));
this.createSnapshot = MapUtils.getBoolean(options,
CREATE_SNAPSHOT_KEY, true);
this.clearSnapshot = MapUtils.getBoolean(options, CLEAR_SNAPSHOT_KEY,
createSnapshot);
+ String clearSnapshotStrategyOption = MapUtils.getOrDefault(options,
CLEAR_SNAPSHOT_STRATEGY_KEY, null);
+
+ this.clearSnapshotStrategy =
parseClearSnapshotStrategy(MapUtils.containsKey(options, CLEAR_SNAPSHOT_KEY),
+ clearSnapshot,
+
clearSnapshotStrategyOption);
this.defaultParallelism = MapUtils.getInt(options,
DEFAULT_PARALLELISM_KEY, 1);
this.numCores = MapUtils.getInt(options, NUM_CORES_KEY, 1);
this.consistencyLevel =
Optional.ofNullable(options.get(MapUtils.lowerCaseKey(CONSISTENCY_LEVEL_KEY)))
@@ -109,6 +135,50 @@ public final class ClientConfig
this.quoteIdentifiers = MapUtils.getBoolean(options,
QUOTE_IDENTIFIERS, false);
}
+ protected ClearSnapshotStrategy parseClearSnapshotStrategy(boolean
hasDeprecatedOption,
+ boolean
clearSnapshot,
+ String
clearSnapshotStrategyOption)
+ {
+ if (hasDeprecatedOption)
+ {
+ LOGGER.warn("The deprecated option 'clearSnapshot' is set. Please
set 'clearSnapshotStrategy' instead.");
+ if (clearSnapshotStrategyOption == null)
+ {
+ return clearSnapshot ? ClearSnapshotStrategy.defaultStrategy()
: new ClearSnapshotStrategy.NoOp();
+ }
+ }
+ if (clearSnapshotStrategyOption == null)
+ {
+ LOGGER.debug("No clearSnapshotStrategy is set. Using the default
strategy");
+ return ClearSnapshotStrategy.defaultStrategy();
+ }
+ String[] strategyParts = clearSnapshotStrategyOption.split(" ", 2);
+ String strategyName;
+ String snapshotTTL = null;
+ if (strategyParts.length == 1)
+ {
+ strategyName = strategyParts[0].trim();
+ }
+ else if (strategyParts.length == 2)
+ {
+ strategyName = strategyParts[0].trim();
+ snapshotTTL = strategyParts[1].trim();
+ if (!Pattern.matches(SNAPSHOT_TTL_PATTERN, snapshotTTL))
+ {
+ String msg = "Incorrect value set for clearSnapshotStrategy,
expected format is " +
+ "{strategy [snapshotTTLvalue]}. TTL value
specified must contain unit along. " +
+ "For e.g. 2d represents a TTL for 2 days. Allowed
units are d, h, m and s.";
+ throw new IllegalArgumentException(msg);
+ }
+ }
+ else
+ {
+ LOGGER.error("Invalid value for ClearSnapshotStrategy: '{}'",
clearSnapshotStrategyOption);
+ throw new IllegalArgumentException("Invalid value: " +
clearSnapshotStrategyOption);
+ }
+ return ClearSnapshotStrategy.create(strategyName, snapshotTTL);
+ }
+
public String sidecarInstances()
{
return sidecarInstances;
@@ -146,6 +216,11 @@ public final class ClientConfig
return clearSnapshot;
}
+ public ClearSnapshotStrategy clearSnapshotStrategy()
+ {
+ return clearSnapshotStrategy;
+ }
+
public int defaultParallelism()
{
return defaultParallelism;
@@ -237,4 +312,138 @@ public final class ClientConfig
}
return requestedFeatures;
}
+
+ abstract static class ClearSnapshotStrategy
+ {
+ private final String snapshotTTL;
+
+ static ClearSnapshotStrategy create(String name, String snapshotTTL)
+ {
+ String stripped = name.trim();
+ if (stripped.equalsIgnoreCase(OnCompletion.class.getSimpleName()))
+ {
+ return new OnCompletion();
+ }
+ else if (stripped.equalsIgnoreCase(TTL.class.getSimpleName()))
+ {
+ return new TTL(snapshotTTL);
+ }
+ else if
(stripped.equalsIgnoreCase(OnCompletionOrTTL.class.getSimpleName()))
+ {
+ return new OnCompletionOrTTL(snapshotTTL);
+ }
+ else if (stripped.equalsIgnoreCase(NoOp.class.getSimpleName()))
+ {
+ return new NoOp();
+ }
+ else
+ {
+ ClearSnapshotStrategy defaultStrategy = defaultStrategy();
+ LOGGER.warn("Unknown ClearSnapshotStrategy {} is passed. Fall
back to default strategy {}.",
+ name, defaultStrategy);
+ throw new IllegalArgumentException("Invalid
ClearSnapshotStrategy " + name + " passed");
+ }
+ }
+
+ public static ClearSnapshotStrategy defaultStrategy()
+ {
+ LOGGER.info("A default TTL value of {} is added to the snapshot.
If the job takes longer than {}, " +
+ "the snapshot will be cleared before job completion
leading to errors.",
+ DEFAULT_SNAPSHOT_TTL_VALUE,
DEFAULT_SNAPSHOT_TTL_VALUE);
+ return new OnCompletionOrTTL(DEFAULT_SNAPSHOT_TTL_VALUE);
+ }
+
+ abstract boolean shouldClearOnCompletion();
+
+ void validateTTLPresence(boolean expectTTL)
+ {
+ if (expectTTL && !hasTTL())
+ {
+ throw new IllegalArgumentException("Incorrect value set for
clearSnapshotStrategy, expected format " +
+ "is {strategy
[snapshotTTLvalue]}. TTL value specified must " +
+ "contain unit along. For
e.g. 2d represents a TTL for 2 days. " +
+ "Allowed units are d, h, m
and s.");
+ }
+ }
+
+ boolean hasTTL()
+ {
+ return snapshotTTL != null && !snapshotTTL.isEmpty();
+ }
+
+ @Nullable
+ String ttl()
+ {
+ return snapshotTTL;
+ }
+
+ @Override
+ public String toString()
+ {
+ return this.getClass().getSimpleName() + (hasTTL() ? ' ' + ttl() :
"");
+ }
+
+ protected ClearSnapshotStrategy(String snapshotTTL)
+ {
+ this.snapshotTTL = snapshotTTL;
+ }
+
+ static class OnCompletion extends ClearSnapshotStrategy
+ {
+ protected OnCompletion()
+ {
+ super(null);
+ }
+
+ @Override
+ boolean shouldClearOnCompletion()
+ {
+ return true;
+ }
+ }
+
+ static class NoOp extends ClearSnapshotStrategy
+ {
+ protected NoOp()
+ {
+ super(null);
+ }
+
+ @Override
+ boolean shouldClearOnCompletion()
+ {
+ return false;
+ }
+ }
+
+ static class OnCompletionOrTTL extends ClearSnapshotStrategy
+ {
+ protected OnCompletionOrTTL(@NotNull String snapshotTTL)
+ {
+ super(snapshotTTL);
+ validateTTLPresence(true);
+ }
+
+ @Override
+ boolean shouldClearOnCompletion()
+ {
+ return true;
+ }
+ }
+
+ static class TTL extends ClearSnapshotStrategy
+ {
+ protected TTL(@NotNull String snapshotTTL)
+ {
+ super(snapshotTTL);
+ validateTTLPresence(true);
+ }
+
+ @Override
+ boolean shouldClearOnCompletion()
+ {
+ return false;
+ }
+ }
+ }
}
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/CassandraDataLayerTests.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/CassandraDataLayerTests.java
new file mode 100644
index 0000000..6baf693
--- /dev/null
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/CassandraDataLayerTests.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.data;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class CassandraDataLayerTests
+{
+ public static final Map<String, String> REQUIRED_CLIENT_CONFIG_OPTIONS =
ImmutableMap.of(
+ "keyspace", "big-data",
+ "table", "customers",
+ "sidecar_instances", "localhost");
+
+ @Test
+ void testDefaultClearSnapshotStrategy()
+ {
+ Map<String, String> options = new
HashMap<>(REQUIRED_CLIENT_CONFIG_OPTIONS);
+ ClientConfig clientConfig = ClientConfig.create(options);
+ assertEquals("big-data", clientConfig.keyspace());
+ assertEquals("customers", clientConfig.table());
+ assertEquals("localhost", clientConfig.sidecarInstances());
+ ClientConfig.ClearSnapshotStrategy clearSnapshotStrategy =
clientConfig.clearSnapshotStrategy();
+ assertTrue(clearSnapshotStrategy.shouldClearOnCompletion());
+ assertEquals("2d", clearSnapshotStrategy.ttl());
+ }
+
+ @ParameterizedTest
+ @CsvSource({"false, NOOP", "true,ONCOMPLETIONORTTL 2d"})
+ void testClearSnapshotOptionSupport(Boolean clearSnapshot, String
expectedClearSnapshotStrategyOption)
+ {
+ Map<String, String> options = new
HashMap<>(REQUIRED_CLIENT_CONFIG_OPTIONS);
+ options.put("clearsnapshot", clearSnapshot.toString());
+ ClientConfig clientConfig = ClientConfig.create(options);
+ ClientConfig.ClearSnapshotStrategy clearSnapshotStrategy =
clientConfig.clearSnapshotStrategy();
+ ClientConfig.ClearSnapshotStrategy expectedClearSnapshotStrategy
+ = clientConfig.parseClearSnapshotStrategy(false, false,
expectedClearSnapshotStrategyOption);
+ assertThat(clearSnapshotStrategy.shouldClearOnCompletion())
+ .isEqualTo(expectedClearSnapshotStrategy.shouldClearOnCompletion());
+
assertThat(clearSnapshotStrategy.hasTTL()).isEqualTo(expectedClearSnapshotStrategy.hasTTL());
+
assertThat(clearSnapshotStrategy.ttl()).isEqualTo(expectedClearSnapshotStrategy.ttl());
+ }
+}
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/ClientConfigTests.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/ClientConfigTests.java
new file mode 100644
index 0000000..7e82c91
--- /dev/null
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/ClientConfigTests.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.data;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import static
org.apache.cassandra.spark.data.ClientConfig.SNAPSHOT_TTL_PATTERN;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+class ClientConfigTests
+{
+ public static final Map<String, String> REQUIRED_CLIENT_CONFIG_OPTIONS =
ImmutableMap.of(
+ "keyspace", "big-data",
+ "table", "customers",
+ "sidecar_instances", "localhost");
+
+ @ParameterizedTest
+ @ValueSource(strings = {"2h", "200s", "4d", "60m", " 60m", "50s ", " 32d
"})
+ void testPositiveSnapshotTTLPatterns(String input)
+ {
+ assertThat(input.trim()).matches(SNAPSHOT_TTL_PATTERN);
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"", " ", "2 h", "200", "d", "6 0m", " h", "3.5h",
".8m", "4.d", "1e+7m"})
+ void testNegativeSnapshotTTLPatterns(String input)
+ {
+ assertThat(input).doesNotMatch(SNAPSHOT_TTL_PATTERN);
+ }
+
+ @ParameterizedTest
+ @CsvSource({"false,false,noop", "true,false,NoOp", "true,true, Noop ",
"false,false,noop 50m"})
+ void testValidNoOpClearSnapshotStrategyParsing(boolean
hasDeprecatedSnapshotOption, boolean clearSnapshot,
+ String option)
+ {
+ ClientConfig.ClearSnapshotStrategy clearSnapshotStrategy
+ = getClearSnapshotStrategy(option, hasDeprecatedSnapshotOption,
clearSnapshot);
+
assertThat(clearSnapshotStrategy).isInstanceOf(ClientConfig.ClearSnapshotStrategy.NoOp.class);
+ assertThat(clearSnapshotStrategy.shouldClearOnCompletion()).isFalse();
+ assertThat(clearSnapshotStrategy.hasTTL()).isFalse();
+ assertThat(clearSnapshotStrategy.ttl()).isNull();
+ }
+
+ @ParameterizedTest
+ @CsvSource({"false,false,tTL 10h,10h", "true,false, TTL 5d ,5d",
"true,true, Ttl 2m ,2m"})
+ void testValidTTLClearSnapshotStrategyParsing(boolean
hasDeprecatedSnapshotOption, boolean clearSnapshot,
+ String option, String
expectedSnapshotTTL)
+ {
+ ClientConfig.ClearSnapshotStrategy clearSnapshotStrategy
+ = getClearSnapshotStrategy(option, hasDeprecatedSnapshotOption,
clearSnapshot);
+
assertThat(clearSnapshotStrategy).isInstanceOf(ClientConfig.ClearSnapshotStrategy.TTL.class);
+ assertThat(clearSnapshotStrategy.shouldClearOnCompletion()).isFalse();
+ assertThat(clearSnapshotStrategy.hasTTL()).isTrue();
+ assertThat(clearSnapshotStrategy.ttl()).isEqualTo(expectedSnapshotTTL);
+ }
+
+ @ParameterizedTest
+ @CsvSource({"false,false,onCompletion", "true,false,OnCoMpLeTiOn",
"true,true, ONCOMPLETION ",
+ "false,false,OnCoMpLeTiOn 5h"})
+ void testValidOnCompletionClearSnapshotStrategyParsing(boolean
hasDeprecatedSnapshotOption, boolean clearSnapshot,
+ String option)
+ {
+ ClientConfig.ClearSnapshotStrategy clearSnapshotStrategy
+ = getClearSnapshotStrategy(option, hasDeprecatedSnapshotOption,
clearSnapshot);
+
assertThat(clearSnapshotStrategy).isInstanceOf(ClientConfig.ClearSnapshotStrategy.OnCompletion.class);
+ assertThat(clearSnapshotStrategy.shouldClearOnCompletion()).isTrue();
+ assertThat(clearSnapshotStrategy.hasTTL()).isFalse();
+ assertThat(clearSnapshotStrategy.ttl()).isNull();
+ }
+
+ @ParameterizedTest
+ @CsvSource({"false,false,onCompletionOrTTL 200m, 200m",
"true,false,oNcOmPlEtIoNoRtTL 0560m,0560m",
+ "true,true, ONCOMPLETIONORTTL 3d, 3d"})
+ void testValidOnCompletionOrTTLClearSnapshotStrategyParsing(boolean
hasDeprecatedSnapshotOption,
+ boolean
clearSnapshot, String option,
+ String
expectedSnapshotTTL)
+ {
+ ClientConfig.ClearSnapshotStrategy clearSnapshotStrategy
+ = getClearSnapshotStrategy(option, hasDeprecatedSnapshotOption,
clearSnapshot);
+
assertThat(clearSnapshotStrategy).isInstanceOf(ClientConfig.ClearSnapshotStrategy.OnCompletionOrTTL.class);
+ assertThat(clearSnapshotStrategy.shouldClearOnCompletion()).isTrue();
+ assertThat(clearSnapshotStrategy.hasTTL()).isTrue();
+ assertThat(clearSnapshotStrategy.ttl()).isEqualTo(expectedSnapshotTTL);
+ }
+
+ @ParameterizedTest
+ @CsvSource({"delete 10h", "ttl5d", "Ttl 2ms", "TTL", "tTL", "No Op", "on
Completion", "ON COMPLETION 3d",
+ "onCompletionOrTTL ", "oN cOmPlEtIoNoRtTL 560m"})
+ void testInValidClearSnapshotStrategyParsing(String option)
+ {
+ Map<String, String> options = new
HashMap<>(REQUIRED_CLIENT_CONFIG_OPTIONS);
+ options.put("clearsnapshotstrategy", option);
+ assertThatThrownBy(() -> {
+ ClientConfig clientConfig = ClientConfig.create(options);
+ clientConfig.parseClearSnapshotStrategy(false, false, option);
+ })
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ private ClientConfig.ClearSnapshotStrategy getClearSnapshotStrategy(String
clearSnapshotStrategyOption,
+
boolean hasDeprecatedSnapshotOption,
+
boolean clearSnapshot)
+ {
+ Map<String, String> options = new
HashMap<>(REQUIRED_CLIENT_CONFIG_OPTIONS);
+ options.put("clearsnapshotstrategy", clearSnapshotStrategyOption);
+ ClientConfig clientConfig = ClientConfig.create(options);
+ return
clientConfig.parseClearSnapshotStrategy(hasDeprecatedSnapshotOption,
+ clearSnapshot,
+
clearSnapshotStrategyOption);
+ }
+}
diff --git
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
index 8e719f0..3697956 100644
---
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
+++
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
@@ -77,7 +77,7 @@ public final class SparkTestUtils
// Shutdown hooks are called after the job ends, and in the
case of integration tests
// the sidecar is already shut down before this. Since the
cluster will be torn
// down anyway, the integration job skips clearing snapshots.
- .option("clearSnapshot", "false")
+ .option("clearSnapshotStrategy", "noop")
.option("defaultParallelism", sc.defaultParallelism())
.option("numCores", numCores)
.option("sizing", "default")
diff --git
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/data/ClearSnapshotTest.java
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/data/ClearSnapshotTest.java
new file mode 100644
index 0000000..a363e44
--- /dev/null
+++
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/data/ClearSnapshotTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.analytics.data;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.junit.jupiter.api.Test;
+
+import com.vdurmont.semver4j.Semver;
+import org.apache.cassandra.analytics.SharedClusterSparkIntegrationTestBase;
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.shared.Uninterruptibles;
+import org.apache.cassandra.distributed.shared.Versions;
+import org.apache.cassandra.distributed.shared.WithProperties;
+import org.apache.cassandra.sidecar.testing.QualifiedName;
+import org.apache.cassandra.testing.TestVersion;
+import org.apache.spark.sql.DataFrameReader;
+import org.apache.spark.sql.Row;
+
+import static org.apache.cassandra.testing.TestUtils.DC1_RF1;
+import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+class ClearSnapshotTest extends SharedClusterSparkIntegrationTestBase
+{
+ private static final WithProperties properties = new WithProperties();
+ static final QualifiedName TABLE_NAME_FOR_TTL_CLEAR_SNAPSHOT_STRATEGY
+ = new QualifiedName(TEST_KEYSPACE, "test_ttl_clear_snapshot_strategy");
+ static final QualifiedName TABLE_NAME_FOR_NO_OP_CLEAR_SNAPSHOT_STRATEGY
+ = new QualifiedName(TEST_KEYSPACE, "test_no_op_clear_snapshot_strategy");
+ static final List<String> DATASET = Arrays.asList("a", "b", "c", "d", "e",
"f", "g", "h");
+
+ @Test
+ void testTTLClearSnapshotStrategy()
+ {
+ DataFrameReader readDf =
bulkReaderDataFrame(TABLE_NAME_FOR_TTL_CLEAR_SNAPSHOT_STRATEGY)
+ .option("snapshotName",
"ttlClearSnapshotStrategyTest")
+ .option("clearSnapshotStrategy", "TTL 10s");
+ List<Row> rows = readDf.load().collectAsList();
+ assertThat(rows.size()).isEqualTo(8);
+
+ String[] dataDirs = (String[]) cluster.getFirstRunningInstance()
+ .config()
+ .getParams()
+ .get("data_file_directories");
+ String dataDir = dataDirs[0];
+ List<Path> snapshotPaths = findChildFile(Paths.get(dataDir),
"ttlClearSnapshotStrategyTest");
+ assertThat(snapshotPaths).isNotEmpty();
+ Path snapshot = snapshotPaths.get(0);
+ assertThat(snapshot).exists();
+
+ // Wait up to 30 seconds to make sure files are cleared after TTLs
have expired
+ int wait = 0;
+ while (Files.exists(snapshot) && wait++ < 30)
+ {
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+ }
+ assertThat(snapshot).doesNotExist();
+ }
+
+ @Test
+ void testNoOpClearSnapshotStrategy()
+ {
+ DataFrameReader readDf =
bulkReaderDataFrame(TABLE_NAME_FOR_NO_OP_CLEAR_SNAPSHOT_STRATEGY)
+ .option("snapshotName",
"noOpClearSnapshotStrategyTest")
+ .option("clearSnapshotStrategy", "noOp");
+ List<Row> rows = readDf.load().collectAsList();
+ assertThat(rows.size()).isEqualTo(8);
+
+ String[] dataDirs = (String[]) cluster.getFirstRunningInstance()
+ .config()
+ .getParams()
+ .get("data_file_directories");
+ String dataDir = dataDirs[0];
+ List<Path> snapshotPaths = findChildFile(Paths.get(dataDir),
"noOpClearSnapshotStrategyTest");
+ assertThat(snapshotPaths).isNotEmpty();
+ Path snapshot = snapshotPaths.get(0);
+ assertThat(snapshot).exists();
+
+ Uninterruptibles.sleepUninterruptibly(30, TimeUnit.SECONDS);
+ assertThat(snapshot).exists();
+ }
+
+ private List<Path> findChildFile(Path path, String target)
+ {
+ try (Stream<Path> walkStream = Files.walk(path))
+ {
+ return walkStream.filter(p -> p.getFileName().endsWith(target) ||
p.toString().contains("/" + target + "/"))
+ .collect(Collectors.toList());
+ }
+ catch (IOException e)
+ {
+ return Collections.emptyList();
+ }
+ }
+
+ @Override
+ protected UpgradeableCluster provisionCluster(TestVersion testVersion)
throws IOException
+ {
+
properties.set(CassandraRelevantProperties.SNAPSHOT_CLEANUP_INITIAL_DELAY_SECONDS,
0);
+
properties.set(CassandraRelevantProperties.SNAPSHOT_CLEANUP_PERIOD_SECONDS, 1);
+
properties.set(CassandraRelevantProperties.SNAPSHOT_MIN_ALLOWED_TTL_SECONDS, 5);
+
+ Versions versions = Versions.find();
+ Versions.Version requestedVersion = versions.getLatest(new
Semver(testVersion.version(),
+
Semver.SemverType.LOOSE));
+ UpgradeableCluster.Builder clusterBuilder =
+ UpgradeableCluster.build(1)
+ .withDynamicPortAllocation(true)
+ .withVersion(requestedVersion)
+ .withDataDirCount(1)
+ .withDCs(1)
+ .withConfig(config ->
config.with(Feature.NATIVE_PROTOCOL)
+ .with(Feature.GOSSIP)
+ .with(Feature.JMX));
+
+ return clusterBuilder.start();
+ }
+
+ @Override
+ protected void afterClusterShutdown()
+ {
+ properties.close();
+ }
+
+ @Override
+ protected void initializeSchemaForTest()
+ {
+ createTestKeyspace(TEST_KEYSPACE, DC1_RF1);
+ String createTableStatement = "CREATE TABLE IF NOT EXISTS %s (c1 int,
c2 text, PRIMARY KEY(c1));";
+ createTestTable(TABLE_NAME_FOR_TTL_CLEAR_SNAPSHOT_STRATEGY,
createTableStatement);
+ populateTable(TABLE_NAME_FOR_TTL_CLEAR_SNAPSHOT_STRATEGY);
+ createTestTable(TABLE_NAME_FOR_NO_OP_CLEAR_SNAPSHOT_STRATEGY,
createTableStatement);
+ populateTable(TABLE_NAME_FOR_NO_OP_CLEAR_SNAPSHOT_STRATEGY);
+ }
+
+ void populateTable(QualifiedName tableName)
+ {
+ for (int i = 0; i < DATASET.size(); i++)
+ {
+ String value = DATASET.get(i);
+ String query = String.format("INSERT INTO %s (c1, c2) VALUES (%d,
'%s');", tableName, i, value);
+ cluster.getFirstRunningInstance()
+ .coordinator()
+ .execute(query, ConsistencyLevel.ALL);
+ }
+ }
+}
diff --git
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/MapUtils.java
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/MapUtils.java
index 173691b..cd06425 100644
---
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/MapUtils.java
+++
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/MapUtils.java
@@ -225,4 +225,15 @@ public final class MapUtils
{
return options.getOrDefault(lowerCaseKey(key), defaultValue);
}
+
+ /**
+ * Method to check if key is present in {@code options} map.
+ * @param options the map
+ * @param key the key to the map
+ * @return boolean
+ */
+ public static boolean containsKey(Map<String, String> options, String key)
+ {
+ return options.containsKey(lowerCaseKey(key));
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]