This is an automated email from the ASF dual-hosted git repository.

frankgh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8c20b45  CASSANDRA-19273: Allow setting TTL for snapshots created
8c20b45 is described below

commit 8c20b452dd0728a6fad6d276a7be9fa1b9274495
Author: Saranya Krishnakumar <[email protected]>
AuthorDate: Wed Jan 10 11:49:44 2024 -0800

    CASSANDRA-19273: Allow setting TTL for snapshots created
    
    Patch by Saranya Krishnakumar; Reviewed by Yifan Cai, Francisco Guerrero 
for CASSANDRA-19273
---
 CHANGES.txt                                        |   1 +
 DEV-README.md                                      |  12 ++
 cassandra-analytics-core/build.gradle              |   1 +
 .../cassandra/spark/data/CassandraDataLayer.java   |  49 ++---
 .../apache/cassandra/spark/data/ClientConfig.java  | 209 +++++++++++++++++++++
 .../spark/data/CassandraDataLayerTests.java        |  69 +++++++
 .../cassandra/spark/data/ClientConfigTests.java    | 135 +++++++++++++
 .../apache/cassandra/analytics/SparkTestUtils.java |   2 +-
 .../analytics/data/ClearSnapshotTest.java          | 176 +++++++++++++++++
 .../org/apache/cassandra/spark/utils/MapUtils.java |  11 ++
 10 files changed, 642 insertions(+), 23 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 88d8100..a987271 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.0.0
+ * Allow setting TTL for snapshots created by Analytics bulk reader 
(CASSANDRA-19273)
  * Fix range split and use open-closed range notation consistently 
(CASSANDRA-19325)
  * Add integration tests using in-jvm-dtest to cover blocklisted instances 
(CASSANDRA-19272)
  * Fix bulk writer consistency level validations for blocked instances 
(CASSANDRA-19257)
diff --git a/DEV-README.md b/DEV-README.md
index 2a2f818..2b3bec6 100644
--- a/DEV-README.md
+++ b/DEV-README.md
@@ -89,6 +89,18 @@ To enable git hooks, run the following command at project 
root.
 git config core.hooksPath githooks
 ```
 
+## Running Integration Tests
+
+To run integration tests, build dependencies with instructions under 
Dependencies section and configure IP aliases 
+needed for integration tests.
+
+### macOS network aliases
+create a temporary alias for every node except the first:
+
+```shell
+ for i in {2..20}; do sudo ifconfig lo0 alias "127.0.0.${i}"; done
+```
+
 ## IntelliJ
 
 The project is well-supported in IntelliJ.
diff --git a/cassandra-analytics-core/build.gradle 
b/cassandra-analytics-core/build.gradle
index 7bc4ed5..9228741 100644
--- a/cassandra-analytics-core/build.gradle
+++ b/cassandra-analytics-core/build.gradle
@@ -70,6 +70,7 @@ project(':cassandra-analytics-core') {
         
testImplementation("org.junit.jupiter:junit-jupiter-api:${project.junitVersion}")
         
testImplementation("org.junit.jupiter:junit-jupiter-params:${project.junitVersion}")
         
testImplementation("org.junit.jupiter:junit-jupiter-engine:${project.junitVersion}")
+        testImplementation("org.assertj:assertj-core:3.24.2")
         testImplementation(group: 'org.quicktheories', name: 'quicktheories', 
version: "${project.rootProject.quickTheoriesVersion}")
         testImplementation(group: 'org.slf4j', name: 'slf4j-simple', version: 
'1.7.26')
         testImplementation(group: 'org.mockito', name: 'mockito-core', 
version: "${project.rootProject.mockitoVersion}")
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
index 675cd1f..5a0ca5a 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
@@ -260,7 +260,7 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
             // Use create snapshot request to capture instance availability 
hint
             LOGGER.info("Creating snapshot snapshotName={} keyspace={} 
table={} dc={}",
                         snapshotName, maybeQuotedKeyspace, maybeQuotedTable, 
datacenter);
-            snapshotFuture = ringFuture.thenCompose(this::createSnapshot);
+            snapshotFuture = ringFuture.thenCompose(ringResponse -> 
createSnapshot(options, ringResponse));
         }
         else
         {
@@ -290,8 +290,8 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
 
     protected void shutdownHook(ClientConfig options)
     {
-        // Preserves previous behavior, but we may just want to check for the 
clearSnapshot option in the future
-        if (options.clearSnapshot())
+        ClientConfig.ClearSnapshotStrategy clearSnapshotStrategy = 
options.clearSnapshotStrategy();
+        if (clearSnapshotStrategy.shouldClearOnCompletion())
         {
             if (options.createSnapshot())
             {
@@ -305,6 +305,10 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
                             snapshotName, keyspace, table, datacenter);
             }
         }
+        else if (clearSnapshotStrategy.hasTTL())
+        {
+            LOGGER.warn("Skipping clearing snapshot because 
clearSnapshotStrategy '{}' is used", clearSnapshotStrategy);
+        }
 
         try
         {
@@ -316,7 +320,7 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
         }
     }
 
-    private CompletionStage<Map<String, AvailabilityHint>> 
createSnapshot(RingResponse ring)
+    private CompletionStage<Map<String, AvailabilityHint>> 
createSnapshot(ClientConfig options, RingResponse ring)
     {
         Map<String, PartitionedDataLayer.AvailabilityHint> availabilityHints = 
new ConcurrentHashMap<>(ring.size());
 
@@ -341,24 +345,25 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
                     LOGGER.info("Creating snapshot on instance snapshotName={} 
keyspace={} table={} datacenter={} fqdn={}",
                                 snapshotName, maybeQuotedKeyspace, 
maybeQuotedTable, datacenter, ringEntry.fqdn());
                     SidecarInstance sidecarInstance = new 
SidecarInstanceImpl(ringEntry.fqdn(), sidecarClientConfig.effectivePort());
-                    createSnapshotFuture = sidecar
-                                           .createSnapshot(sidecarInstance, 
maybeQuotedKeyspace, maybeQuotedTable, snapshotName)
-                                           .handle((resp, throwable) -> {
-                                               if (throwable == null)
-                                               {
-                                                   // Create snapshot succeeded
-                                                   return hint;
-                                               }
-
-                                               if (isExhausted(throwable))
-                                               {
-                                                   LOGGER.warn("Failed to 
create snapshot on instance", throwable);
-                                                   return 
PartitionedDataLayer.AvailabilityHint.DOWN;
-                                               }
-
-                                               LOGGER.error("Unexpected error 
creating snapshot on instance", throwable);
-                                               return 
PartitionedDataLayer.AvailabilityHint.UNKNOWN;
-                                           });
+                    ClientConfig.ClearSnapshotStrategy clearSnapshotStrategy = 
options.clearSnapshotStrategy();
+                    createSnapshotFuture = 
sidecar.createSnapshot(sidecarInstance, maybeQuotedKeyspace,
+                                                                  
maybeQuotedTable, snapshotName, clearSnapshotStrategy.ttl())
+                                                  .handle((resp, throwable) -> 
{
+                                                      if (throwable == null)
+                                                      {
+                                                          // Create snapshot 
succeeded
+                                                          return hint;
+                                                      }
+
+                                                      if 
(isExhausted(throwable))
+                                                      {
+                                                          LOGGER.warn("Failed 
to create snapshot on instance", throwable);
+                                                          return 
PartitionedDataLayer.AvailabilityHint.DOWN;
+                                                      }
+
+                                                      LOGGER.error("Unexpected 
error creating snapshot on instance", throwable);
+                                                      return 
PartitionedDataLayer.AvailabilityHint.UNKNOWN;
+                                                  });
                 }
 
                 return createSnapshotFuture
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/ClientConfig.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/ClientConfig.java
index d1941ee..5330f1b 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/ClientConfig.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/ClientConfig.java
@@ -24,18 +24,25 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.UUID;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.bridge.BigNumberConfigImpl;
 import org.apache.cassandra.spark.config.SchemaFeature;
 import org.apache.cassandra.spark.config.SchemaFeatureSet;
 import org.apache.cassandra.spark.data.partitioner.ConsistencyLevel;
 import org.apache.cassandra.spark.utils.MapUtils;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 import static 
org.apache.cassandra.spark.data.CassandraDataLayer.aliasLastModifiedTimestamp;
 
 public final class ClientConfig
 {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ClientConfig.class);
+
     public static final String SIDECAR_INSTANCES = "sidecar_instances";
     public static final String KEYSPACE_KEY = "keyspace";
     public static final String TABLE_KEY = "table";
@@ -43,6 +50,19 @@ public final class ClientConfig
     public static final String DC_KEY = "dc";
     public static final String CREATE_SNAPSHOT_KEY = "createSnapshot";
     public static final String CLEAR_SNAPSHOT_KEY = "clearSnapshot";
+    /**
+     * Format of clearSnapshotStrategy is {strategy [snapshotTTLvalue]}, 
clearSnapshotStrategy holds both the strategy
+     * and in case of TTL based strategy, TTL value. For e.g. 
onCompletionOrTTL 2d, TTL 2d, noOp, onCompletion. For
+     * clear snapshot strategies allowed check {@link ClearSnapshotStrategy}
+     */
+    public static final String CLEAR_SNAPSHOT_STRATEGY_KEY = 
"clearSnapshotStrategy";
+    /**
+     * TTL value is time to live option for the snapshot (available since 
Cassandra 4.1+). TTL value specified must
+     * contain unit along. For e.g. 2d represents a TTL for 2 days; 1h 
represents a TTL of 1 hour, etc.
+     * Valid units are {@code d}, {@code h}, {@code s} and {@code m}.
+     */
+    public static final String DEFAULT_SNAPSHOT_TTL_VALUE = "2d";
+    public static final String SNAPSHOT_TTL_PATTERN = "\\d+(d|h|m|s)";
     public static final String DEFAULT_PARALLELISM_KEY = "defaultParallelism";
     public static final String NUM_CORES_KEY = "numCores";
     public static final String CONSISTENCY_LEVEL_KEY = "consistencyLevel";
@@ -67,6 +87,7 @@ public final class ClientConfig
     private final String datacenter;
     private final boolean createSnapshot;
     private final boolean clearSnapshot;
+    private final ClearSnapshotStrategy clearSnapshotStrategy;
     private final int defaultParallelism;
     private final int numCores;
     private final ConsistencyLevel consistencyLevel;
@@ -91,6 +112,11 @@ public final class ClientConfig
         this.datacenter = options.get(MapUtils.lowerCaseKey(DC_KEY));
         this.createSnapshot = MapUtils.getBoolean(options, 
CREATE_SNAPSHOT_KEY, true);
         this.clearSnapshot = MapUtils.getBoolean(options, CLEAR_SNAPSHOT_KEY, 
createSnapshot);
+        String clearSnapshotStrategyOption = MapUtils.getOrDefault(options, 
CLEAR_SNAPSHOT_STRATEGY_KEY, null);
+
+        this.clearSnapshotStrategy = 
parseClearSnapshotStrategy(MapUtils.containsKey(options, CLEAR_SNAPSHOT_KEY),
+                                                                clearSnapshot,
+                                                                
clearSnapshotStrategyOption);
         this.defaultParallelism = MapUtils.getInt(options, 
DEFAULT_PARALLELISM_KEY, 1);
         this.numCores = MapUtils.getInt(options, NUM_CORES_KEY, 1);
         this.consistencyLevel = 
Optional.ofNullable(options.get(MapUtils.lowerCaseKey(CONSISTENCY_LEVEL_KEY)))
@@ -109,6 +135,50 @@ public final class ClientConfig
         this.quoteIdentifiers = MapUtils.getBoolean(options, 
QUOTE_IDENTIFIERS, false);
     }
 
+    protected ClearSnapshotStrategy parseClearSnapshotStrategy(boolean 
hasDeprecatedOption,
+                                                               boolean 
clearSnapshot,
+                                                               String 
clearSnapshotStrategyOption)
+    {
+        if (hasDeprecatedOption)
+        {
+            LOGGER.warn("The deprecated option 'clearSnapshot' is set. Please 
set 'clearSnapshotStrategy' instead.");
+            if (clearSnapshotStrategyOption == null)
+            {
+                return clearSnapshot ? ClearSnapshotStrategy.defaultStrategy() 
: new ClearSnapshotStrategy.NoOp();
+            }
+        }
+        if (clearSnapshotStrategyOption == null)
+        {
+            LOGGER.debug("No clearSnapshotStrategy is set. Using the default 
strategy");
+            return ClearSnapshotStrategy.defaultStrategy();
+        }
+        String[] strategyParts = clearSnapshotStrategyOption.split(" ", 2);
+        String strategyName;
+        String snapshotTTL = null;
+        if (strategyParts.length == 1)
+        {
+            strategyName = strategyParts[0].trim();
+        }
+        else if (strategyParts.length == 2)
+        {
+            strategyName = strategyParts[0].trim();
+            snapshotTTL = strategyParts[1].trim();
+            if (!Pattern.matches(SNAPSHOT_TTL_PATTERN, snapshotTTL))
+            {
+                String msg = "Incorrect value set for clearSnapshotStrategy, 
expected format is " +
+                             "{strategy [snapshotTTLvalue]}. TTL value 
specified must contain unit along. " +
+                             "For e.g. 2d represents a TTL for 2 days. Allowed 
units are d, h, m and s.";
+                throw new IllegalArgumentException(msg);
+            }
+        }
+        else
+        {
+            LOGGER.error("Invalid value for ClearSnapshotStrategy: '{}'", 
clearSnapshotStrategyOption);
+            throw new IllegalArgumentException("Invalid value: " + 
clearSnapshotStrategyOption);
+        }
+        return ClearSnapshotStrategy.create(strategyName, snapshotTTL);
+    }
+
     public String sidecarInstances()
     {
         return sidecarInstances;
@@ -146,6 +216,11 @@ public final class ClientConfig
         return clearSnapshot;
     }
 
+    public ClearSnapshotStrategy clearSnapshotStrategy()
+    {
+        return clearSnapshotStrategy;
+    }
+
     public int defaultParallelism()
     {
         return defaultParallelism;
@@ -237,4 +312,138 @@ public final class ClientConfig
         }
         return requestedFeatures;
     }
+
+    abstract static class ClearSnapshotStrategy
+    {
+        private final String snapshotTTL;
+
+        static ClearSnapshotStrategy create(String name, String snapshotTTL)
+        {
+            String stripped = name.trim();
+            if (stripped.equalsIgnoreCase(OnCompletion.class.getSimpleName()))
+            {
+                return new OnCompletion();
+            }
+            else if (stripped.equalsIgnoreCase(TTL.class.getSimpleName()))
+            {
+                return new TTL(snapshotTTL);
+            }
+            else if 
(stripped.equalsIgnoreCase(OnCompletionOrTTL.class.getSimpleName()))
+            {
+                return new OnCompletionOrTTL(snapshotTTL);
+            }
+            else if (stripped.equalsIgnoreCase(NoOp.class.getSimpleName()))
+            {
+                return new NoOp();
+            }
+            else
+            {
+                ClearSnapshotStrategy defaultStrategy = defaultStrategy();
+                LOGGER.warn("Unknown ClearSnapshotStrategy {} is passed. Fall 
back to default strategy {}.",
+                            name, defaultStrategy);
+                throw new IllegalArgumentException("Invalid 
ClearSnapshotStrategy " + name + " passed");
+            }
+        }
+
+        public static ClearSnapshotStrategy defaultStrategy()
+        {
+            LOGGER.info("A default TTL value of {} is added to the snapshot. 
If the job takes longer than {}, " +
+                        "the snapshot will be cleared before job completion 
leading to errors.",
+                        DEFAULT_SNAPSHOT_TTL_VALUE, 
DEFAULT_SNAPSHOT_TTL_VALUE);
+            return new OnCompletionOrTTL(DEFAULT_SNAPSHOT_TTL_VALUE);
+        }
+
+        abstract boolean shouldClearOnCompletion();
+
+        void validateTTLPresence(boolean expectTTL)
+        {
+            if (expectTTL && !hasTTL())
+            {
+                throw new IllegalArgumentException("Incorrect value set for 
clearSnapshotStrategy, expected format " +
+                                                   "is {strategy 
[snapshotTTLvalue]}. TTL value specified must " +
+                                                   "contain unit along. For 
e.g. 2d represents a TTL for 2 days. " +
+                                                   "Allowed units are d, h, m 
and s.");
+            }
+        }
+
+        boolean hasTTL()
+        {
+            return snapshotTTL != null && !snapshotTTL.isEmpty();
+        }
+
+        @Nullable
+        String ttl()
+        {
+            return snapshotTTL;
+        }
+
+        @Override
+        public String toString()
+        {
+            return this.getClass().getSimpleName() + (hasTTL() ? ' ' + ttl() : 
"");
+        }
+
+        protected ClearSnapshotStrategy(String snapshotTTL)
+        {
+            this.snapshotTTL = snapshotTTL;
+        }
+
+        static class OnCompletion extends ClearSnapshotStrategy
+        {
+            protected OnCompletion()
+            {
+                super(null);
+            }
+
+            @Override
+            boolean shouldClearOnCompletion()
+            {
+                return true;
+            }
+        }
+
+        static class NoOp extends ClearSnapshotStrategy
+        {
+            protected NoOp()
+            {
+                super(null);
+            }
+
+            @Override
+            boolean shouldClearOnCompletion()
+            {
+                return false;
+            }
+        }
+
+        static class OnCompletionOrTTL extends ClearSnapshotStrategy
+        {
+            protected OnCompletionOrTTL(@NotNull String snapshotTTL)
+            {
+                super(snapshotTTL);
+                validateTTLPresence(true);
+            }
+
+            @Override
+            boolean shouldClearOnCompletion()
+            {
+                return true;
+            }
+        }
+
+        static class TTL extends ClearSnapshotStrategy
+        {
+            protected TTL(@NotNull String snapshotTTL)
+            {
+                super(snapshotTTL);
+                validateTTLPresence(true);
+            }
+
+            @Override
+            boolean shouldClearOnCompletion()
+            {
+                return false;
+            }
+        }
+    }
 }
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/CassandraDataLayerTests.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/CassandraDataLayerTests.java
new file mode 100644
index 0000000..6baf693
--- /dev/null
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/CassandraDataLayerTests.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.data;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class CassandraDataLayerTests
+{
+    public static final Map<String, String> REQUIRED_CLIENT_CONFIG_OPTIONS = 
ImmutableMap.of(
+    "keyspace", "big-data",
+    "table", "customers",
+    "sidecar_instances", "localhost");
+
+    @Test
+    void testDefaultClearSnapshotStrategy()
+    {
+        Map<String, String> options = new 
HashMap<>(REQUIRED_CLIENT_CONFIG_OPTIONS);
+        ClientConfig clientConfig = ClientConfig.create(options);
+        assertEquals("big-data", clientConfig.keyspace());
+        assertEquals("customers", clientConfig.table());
+        assertEquals("localhost", clientConfig.sidecarInstances());
+        ClientConfig.ClearSnapshotStrategy clearSnapshotStrategy = 
clientConfig.clearSnapshotStrategy();
+        assertTrue(clearSnapshotStrategy.shouldClearOnCompletion());
+        assertEquals("2d", clearSnapshotStrategy.ttl());
+    }
+
+    @ParameterizedTest
+    @CsvSource({"false, NOOP", "true,ONCOMPLETIONORTTL 2d"})
+    void testClearSnapshotOptionSupport(Boolean clearSnapshot, String 
expectedClearSnapshotStrategyOption)
+    {
+        Map<String, String> options = new 
HashMap<>(REQUIRED_CLIENT_CONFIG_OPTIONS);
+        options.put("clearsnapshot", clearSnapshot.toString());
+        ClientConfig clientConfig = ClientConfig.create(options);
+        ClientConfig.ClearSnapshotStrategy clearSnapshotStrategy = 
clientConfig.clearSnapshotStrategy();
+        ClientConfig.ClearSnapshotStrategy expectedClearSnapshotStrategy
+        = clientConfig.parseClearSnapshotStrategy(false, false, 
expectedClearSnapshotStrategyOption);
+        assertThat(clearSnapshotStrategy.shouldClearOnCompletion())
+        .isEqualTo(expectedClearSnapshotStrategy.shouldClearOnCompletion());
+        
assertThat(clearSnapshotStrategy.hasTTL()).isEqualTo(expectedClearSnapshotStrategy.hasTTL());
+        
assertThat(clearSnapshotStrategy.ttl()).isEqualTo(expectedClearSnapshotStrategy.ttl());
+    }
+}
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/ClientConfigTests.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/ClientConfigTests.java
new file mode 100644
index 0000000..7e82c91
--- /dev/null
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/ClientConfigTests.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.data;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import static 
org.apache.cassandra.spark.data.ClientConfig.SNAPSHOT_TTL_PATTERN;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+class ClientConfigTests
+{
+    public static final Map<String, String> REQUIRED_CLIENT_CONFIG_OPTIONS = 
ImmutableMap.of(
+    "keyspace", "big-data",
+    "table", "customers",
+    "sidecar_instances", "localhost");
+
+    @ParameterizedTest
+    @ValueSource(strings = {"2h", "200s", "4d", "60m", "  60m", "50s ", " 32d 
"})
+    void testPositiveSnapshotTTLPatterns(String input)
+    {
+        assertThat(input.trim()).matches(SNAPSHOT_TTL_PATTERN);
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {"", " ", "2 h", "200", "d", "6 0m", " h", "3.5h", 
".8m", "4.d", "1e+7m"})
+    void testNegativeSnapshotTTLPatterns(String input)
+    {
+        assertThat(input).doesNotMatch(SNAPSHOT_TTL_PATTERN);
+    }
+
+    @ParameterizedTest
+    @CsvSource({"false,false,noop", "true,false,NoOp", "true,true,  Noop ", 
"false,false,noop 50m"})
+    void testValidNoOpClearSnapshotStrategyParsing(boolean 
hasDeprecatedSnapshotOption, boolean clearSnapshot,
+                                                   String option)
+    {
+        ClientConfig.ClearSnapshotStrategy clearSnapshotStrategy
+        = getClearSnapshotStrategy(option, hasDeprecatedSnapshotOption, 
clearSnapshot);
+        
assertThat(clearSnapshotStrategy).isInstanceOf(ClientConfig.ClearSnapshotStrategy.NoOp.class);
+        assertThat(clearSnapshotStrategy.shouldClearOnCompletion()).isFalse();
+        assertThat(clearSnapshotStrategy.hasTTL()).isFalse();
+        assertThat(clearSnapshotStrategy.ttl()).isNull();
+    }
+
+    @ParameterizedTest
+    @CsvSource({"false,false,tTL 10h,10h", "true,false,  TTL   5d  ,5d", 
"true,true,  Ttl 2m  ,2m"})
+    void testValidTTLClearSnapshotStrategyParsing(boolean 
hasDeprecatedSnapshotOption, boolean clearSnapshot,
+                                                  String option, String 
expectedSnapshotTTL)
+    {
+        ClientConfig.ClearSnapshotStrategy clearSnapshotStrategy
+        = getClearSnapshotStrategy(option, hasDeprecatedSnapshotOption, 
clearSnapshot);
+        
assertThat(clearSnapshotStrategy).isInstanceOf(ClientConfig.ClearSnapshotStrategy.TTL.class);
+        assertThat(clearSnapshotStrategy.shouldClearOnCompletion()).isFalse();
+        assertThat(clearSnapshotStrategy.hasTTL()).isTrue();
+        assertThat(clearSnapshotStrategy.ttl()).isEqualTo(expectedSnapshotTTL);
+    }
+
+    @ParameterizedTest
+    @CsvSource({"false,false,onCompletion", "true,false,OnCoMpLeTiOn", 
"true,true,  ONCOMPLETION ",
+                "false,false,OnCoMpLeTiOn 5h"})
+    void testValidOnCompletionClearSnapshotStrategyParsing(boolean 
hasDeprecatedSnapshotOption, boolean clearSnapshot,
+                                                           String option)
+    {
+        ClientConfig.ClearSnapshotStrategy clearSnapshotStrategy
+        = getClearSnapshotStrategy(option, hasDeprecatedSnapshotOption, 
clearSnapshot);
+        
assertThat(clearSnapshotStrategy).isInstanceOf(ClientConfig.ClearSnapshotStrategy.OnCompletion.class);
+        assertThat(clearSnapshotStrategy.shouldClearOnCompletion()).isTrue();
+        assertThat(clearSnapshotStrategy.hasTTL()).isFalse();
+        assertThat(clearSnapshotStrategy.ttl()).isNull();
+    }
+
+    @ParameterizedTest
+    @CsvSource({"false,false,onCompletionOrTTL 200m, 200m", 
"true,false,oNcOmPlEtIoNoRtTL   0560m,0560m",
+                "true,true,  ONCOMPLETIONORTTL  3d, 3d"})
+    void testValidOnCompletionOrTTLClearSnapshotStrategyParsing(boolean 
hasDeprecatedSnapshotOption,
+                                                                boolean 
clearSnapshot, String option,
+                                                                String 
expectedSnapshotTTL)
+    {
+        ClientConfig.ClearSnapshotStrategy clearSnapshotStrategy
+        = getClearSnapshotStrategy(option, hasDeprecatedSnapshotOption, 
clearSnapshot);
+        
assertThat(clearSnapshotStrategy).isInstanceOf(ClientConfig.ClearSnapshotStrategy.OnCompletionOrTTL.class);
+        assertThat(clearSnapshotStrategy.shouldClearOnCompletion()).isTrue();
+        assertThat(clearSnapshotStrategy.hasTTL()).isTrue();
+        assertThat(clearSnapshotStrategy.ttl()).isEqualTo(expectedSnapshotTTL);
+    }
+
+    @ParameterizedTest
+    @CsvSource({"delete 10h", "ttl5d", "Ttl 2ms", "TTL", "tTL", "No Op", "on 
Completion", "ON COMPLETION 3d",
+                "onCompletionOrTTL ", "oN cOmPlEtIoNoRtTL 560m"})
+    void testInValidClearSnapshotStrategyParsing(String option)
+    {
+        Map<String, String> options = new 
HashMap<>(REQUIRED_CLIENT_CONFIG_OPTIONS);
+        options.put("clearsnapshotstrategy", option);
+        assertThatThrownBy(() -> {
+            ClientConfig clientConfig = ClientConfig.create(options);
+            clientConfig.parseClearSnapshotStrategy(false, false, option);
+        })
+        .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    private ClientConfig.ClearSnapshotStrategy getClearSnapshotStrategy(String 
clearSnapshotStrategyOption,
+                                                                        
boolean hasDeprecatedSnapshotOption,
+                                                                        
boolean clearSnapshot)
+    {
+        Map<String, String> options = new 
HashMap<>(REQUIRED_CLIENT_CONFIG_OPTIONS);
+        options.put("clearsnapshotstrategy", clearSnapshotStrategyOption);
+        ClientConfig clientConfig = ClientConfig.create(options);
+        return 
clientConfig.parseClearSnapshotStrategy(hasDeprecatedSnapshotOption,
+                                                       clearSnapshot,
+                                                       
clearSnapshotStrategyOption);
+    }
+}
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
index 8e719f0..3697956 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
@@ -77,7 +77,7 @@ public final class SparkTestUtils
                   // Shutdown hooks are called after the job ends, and in the 
case of integration tests
                   // the sidecar is already shut down before this. Since the 
cluster will be torn
                   // down anyway, the integration job skips clearing snapshots.
-                  .option("clearSnapshot", "false")
+                  .option("clearSnapshotStrategy", "noop")
                   .option("defaultParallelism", sc.defaultParallelism())
                   .option("numCores", numCores)
                   .option("sizing", "default")
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/data/ClearSnapshotTest.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/data/ClearSnapshotTest.java
new file mode 100644
index 0000000..a363e44
--- /dev/null
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/data/ClearSnapshotTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.analytics.data;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.junit.jupiter.api.Test;
+
+import com.vdurmont.semver4j.Semver;
+import org.apache.cassandra.analytics.SharedClusterSparkIntegrationTestBase;
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.shared.Uninterruptibles;
+import org.apache.cassandra.distributed.shared.Versions;
+import org.apache.cassandra.distributed.shared.WithProperties;
+import org.apache.cassandra.sidecar.testing.QualifiedName;
+import org.apache.cassandra.testing.TestVersion;
+import org.apache.spark.sql.DataFrameReader;
+import org.apache.spark.sql.Row;
+
+import static org.apache.cassandra.testing.TestUtils.DC1_RF1;
+import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+class ClearSnapshotTest extends SharedClusterSparkIntegrationTestBase
+{
+    private static final WithProperties properties = new WithProperties();
+    static final QualifiedName TABLE_NAME_FOR_TTL_CLEAR_SNAPSHOT_STRATEGY
+    = new QualifiedName(TEST_KEYSPACE, "test_ttl_clear_snapshot_strategy");
+    static final QualifiedName TABLE_NAME_FOR_NO_OP_CLEAR_SNAPSHOT_STRATEGY
+    = new QualifiedName(TEST_KEYSPACE, "test_no_op_clear_snapshot_strategy");
+    static final List<String> DATASET = Arrays.asList("a", "b", "c", "d", "e", 
"f", "g", "h");
+
+    @Test
+    void testTTLClearSnapshotStrategy()
+    {
+        DataFrameReader readDf = 
bulkReaderDataFrame(TABLE_NAME_FOR_TTL_CLEAR_SNAPSHOT_STRATEGY)
+                                 .option("snapshotName", 
"ttlClearSnapshotStrategyTest")
+                                 .option("clearSnapshotStrategy", "TTL 10s");
+        List<Row> rows = readDf.load().collectAsList();
+        assertThat(rows.size()).isEqualTo(8);
+
+        String[] dataDirs = (String[]) cluster.getFirstRunningInstance()
+                                              .config()
+                                              .getParams()
+                                              .get("data_file_directories");
+        String dataDir = dataDirs[0];
+        List<Path> snapshotPaths = findChildFile(Paths.get(dataDir), 
"ttlClearSnapshotStrategyTest");
+        assertThat(snapshotPaths).isNotEmpty();
+        Path snapshot = snapshotPaths.get(0);
+        assertThat(snapshot).exists();
+
+        // Wait up to 30 seconds to make sure files are cleared after TTLs 
have expired
+        int wait = 0;
+        while (Files.exists(snapshot) && wait++ < 30)
+        {
+            Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+        }
+        assertThat(snapshot).doesNotExist();
+    }
+
+    @Test
+    void testNoOpClearSnapshotStrategy()
+    {
+        DataFrameReader readDf = 
bulkReaderDataFrame(TABLE_NAME_FOR_NO_OP_CLEAR_SNAPSHOT_STRATEGY)
+                                 .option("snapshotName", 
"noOpClearSnapshotStrategyTest")
+                                 .option("clearSnapshotStrategy", "noOp");
+        List<Row> rows = readDf.load().collectAsList();
+        assertThat(rows.size()).isEqualTo(8);
+
+        String[] dataDirs = (String[]) cluster.getFirstRunningInstance()
+                                              .config()
+                                              .getParams()
+                                              .get("data_file_directories");
+        String dataDir = dataDirs[0];
+        List<Path> snapshotPaths = findChildFile(Paths.get(dataDir), 
"noOpClearSnapshotStrategyTest");
+        assertThat(snapshotPaths).isNotEmpty();
+        Path snapshot = snapshotPaths.get(0);
+        assertThat(snapshot).exists();
+
+        Uninterruptibles.sleepUninterruptibly(30, TimeUnit.SECONDS);
+        assertThat(snapshot).exists();
+    }
+
+    private List<Path> findChildFile(Path path, String target)
+    {
+        try (Stream<Path> walkStream = Files.walk(path))
+        {
+            return walkStream.filter(p -> p.getFileName().endsWith(target) || 
p.toString().contains("/" + target + "/"))
+                             .collect(Collectors.toList());
+        }
+        catch (IOException e)
+        {
+            return Collections.emptyList();
+        }
+    }
+
+    @Override
+    protected UpgradeableCluster provisionCluster(TestVersion testVersion) 
throws IOException
+    {
+        
properties.set(CassandraRelevantProperties.SNAPSHOT_CLEANUP_INITIAL_DELAY_SECONDS,
 0);
+        
properties.set(CassandraRelevantProperties.SNAPSHOT_CLEANUP_PERIOD_SECONDS, 1);
+        
properties.set(CassandraRelevantProperties.SNAPSHOT_MIN_ALLOWED_TTL_SECONDS, 5);
+
+        Versions versions = Versions.find();
+        Versions.Version requestedVersion = versions.getLatest(new 
Semver(testVersion.version(),
+                                                                          
Semver.SemverType.LOOSE));
+        UpgradeableCluster.Builder clusterBuilder =
+        UpgradeableCluster.build(1)
+                          .withDynamicPortAllocation(true)
+                          .withVersion(requestedVersion)
+                          .withDataDirCount(1)
+                          .withDCs(1)
+                          .withConfig(config -> 
config.with(Feature.NATIVE_PROTOCOL)
+                                                      .with(Feature.GOSSIP)
+                                                      .with(Feature.JMX));
+
+        return clusterBuilder.start();
+    }
+
+    @Override
+    protected void afterClusterShutdown()
+    {
+        properties.close();
+    }
+
+    @Override
+    protected void initializeSchemaForTest()
+    {
+        createTestKeyspace(TEST_KEYSPACE, DC1_RF1);
+        String createTableStatement = "CREATE TABLE IF NOT EXISTS %s (c1 int, 
c2 text, PRIMARY KEY(c1));";
+        createTestTable(TABLE_NAME_FOR_TTL_CLEAR_SNAPSHOT_STRATEGY, 
createTableStatement);
+        populateTable(TABLE_NAME_FOR_TTL_CLEAR_SNAPSHOT_STRATEGY);
+        createTestTable(TABLE_NAME_FOR_NO_OP_CLEAR_SNAPSHOT_STRATEGY, 
createTableStatement);
+        populateTable(TABLE_NAME_FOR_NO_OP_CLEAR_SNAPSHOT_STRATEGY);
+    }
+
+    void populateTable(QualifiedName tableName)
+    {
+        for (int i = 0; i < DATASET.size(); i++)
+        {
+            String value = DATASET.get(i);
+            String query = String.format("INSERT INTO %s (c1, c2) VALUES (%d, 
'%s');", tableName, i, value);
+            cluster.getFirstRunningInstance()
+                   .coordinator()
+                   .execute(query, ConsistencyLevel.ALL);
+        }
+    }
+}
diff --git 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/MapUtils.java 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/MapUtils.java
index 173691b..cd06425 100644
--- 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/MapUtils.java
+++ 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/MapUtils.java
@@ -225,4 +225,15 @@ public final class MapUtils
     {
         return options.getOrDefault(lowerCaseKey(key), defaultValue);
     }
+
+    /**
+     * Method to check if key is present in {@code options} map.
+     * @param options   the map
+     * @param key       the key to the map
+     * @return boolean
+     */
+    public static boolean containsKey(Map<String, String> options, String key)
+    {
+        return options.containsKey(lowerCaseKey(key));
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to