This is an automated email from the ASF dual-hosted git repository.

smiklosovic pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bd36cb9f6b implement startup check to prevent Cassandra to potentially 
spread zombie data
bd36cb9f6b is described below

commit bd36cb9f6b8cc6339f43382bf6625794b641552b
Author: Stefan Miklosovic <[email protected]>
AuthorDate: Mon Mar 28 19:46:11 2022 +0200

    implement startup check to prevent Cassandra to potentially spread zombie 
data
    
    patch by Stefan Miklosovic; reviwed by Paulo Motta and Brandon Williams for 
CASSANDRA-17180
---
 CHANGES.txt                                        |   1 +
 conf/cassandra.yaml                                |  19 +-
 .../config/CassandraRelevantProperties.java        |   2 +
 .../apache/cassandra/schema/SchemaKeyspace.java    |   3 +-
 .../cassandra/service/DataResurrectionCheck.java   | 308 +++++++++++++++++++++
 .../service/FileSystemOwnershipCheck.java          |   4 +-
 .../org/apache/cassandra/service/StartupCheck.java |   9 +
 .../apache/cassandra/service/StartupChecks.java    |  26 +-
 .../service/snapshot/SnapshotManifest.java         |  26 +-
 .../org/apache/cassandra/utils/FBUtilities.java    |  28 +-
 .../test/DataResurrectionCheckTest.java            | 184 ++++++++++++
 .../config/DatabaseDescriptorRefTest.java          |   2 +
 .../cassandra/config/StartupCheckOptionsTest.java  |  70 ++++-
 .../AbstractFilesystemOwnershipCheckTest.java      |   8 +-
 .../cassandra/service/StartupChecksTest.java       |  65 ++++-
 .../YamlBasedFileSystemOwnershipCheckTest.java     |   6 +-
 16 files changed, 693 insertions(+), 68 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index ff466d831d..c36f9d2eda 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.1
+ * Implement startup check to prevent Cassandra to potentially spread zombie 
data (CASSANDRA-17180)
  * Allow failing startup on duplicate config keys (CASSANDRA-17379)
  * Migrate threshold for minimum keyspace replication factor to guardrails 
(CASSANDRA-17212)
  * Add guardrail to disallow TRUNCATE and DROP TABLE commands (CASSANDRA-17558)
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 89feab4b12..f846c7148a 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1696,12 +1696,23 @@ drop_compact_storage_enabled: false
 # Uncomment the startup checks and configure them appropriately to cover your 
needs.
 #
 #startup_checks:
-#  filesystem_ownership:
+# Verifies correct ownership of attached locations on disk at startup. See 
CASSANDRA-16879 for more details.
+#  check_filesystem_ownership:
 #    enabled: false
 #    ownership_token: "sometoken" # (overriden by "CassandraOwnershipToken" 
system property)
 #    ownership_filename: ".cassandra_fs_ownership" # (overriden by 
"cassandra.fs_ownership_filename")
-#  dc:
+# Prevents a node from starting if snitch's data center differs from previous 
data center.
+#  check_dc:
 #    enabled: true # (overriden by cassandra.ignore_dc system property)
-#  rack:
+# Prevents a node from starting if snitch's rack differs from previous rack.
+#  check_rack:
 #    enabled: true # (overriden by cassandra.ignore_rack system property)
-
+# Enable this property to fail startup if the node is down for longer than 
gc_grace_seconds, to potentially
+# prevent data resurrection on tables with deletes. By default, this will run 
against all keyspaces and tables
+# except the ones specified on excluded_keyspaces and excluded_tables.
+#  check_data_resurrection:
+#    enabled: false
+# file where Cassandra periodically writes the last time it was known to run
+#    heartbeat_file: /var/lib/cassandra/data/cassandra-heartbeat
+#    excluded_keyspaces: # comma separated list of keyspaces to exclude from 
the check
+#    excluded_tables: # comma separated list of keyspace.table pairs to 
exclude from the check
diff --git 
a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java 
b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
index 19b7d2a70d..6eea323976 100644
--- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
+++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
@@ -255,6 +255,8 @@ public enum CassandraRelevantProperties
     FILE_SYSTEM_CHECK_OWNERSHIP_FILENAME("cassandra.fs_ownership_filename", 
FileSystemOwnershipCheck.DEFAULT_FS_OWNERSHIP_FILENAME),
     @Deprecated // should be removed in favor of flags in relevant startup 
check (FileSystemOwnershipCheck)
     
FILE_SYSTEM_CHECK_OWNERSHIP_TOKEN(FileSystemOwnershipCheck.FILE_SYSTEM_CHECK_OWNERSHIP_TOKEN),
+    // default heartbeating period is 1 minute
+    
CHECK_DATA_RESURRECTION_HEARTBEAT_PERIOD("check_data_resurrection_heartbeat_period_milli",
 "60000"),
 
     // defaults to false for 4.1 but plan to switch to true in a later release
     // the thinking is that environments may not work right off the bat so 
safer to add this feature disabled by default
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java 
b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index 4f605390d7..dd134f02ee 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -830,8 +830,7 @@ public final class SchemaKeyspace
     /*
      * Fetching schema
      */
-
-    static Keyspaces fetchNonSystemKeyspaces()
+    public static Keyspaces fetchNonSystemKeyspaces()
     {
         return 
fetchKeyspacesWithout(SchemaConstants.LOCAL_SYSTEM_KEYSPACE_NAMES);
     }
diff --git a/src/java/org/apache/cassandra/service/DataResurrectionCheck.java 
b/src/java/org/apache/cassandra/service/DataResurrectionCheck.java
new file mode 100644
index 0000000000..2c3b035f9d
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/DataResurrectionCheck.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.StartupChecksOptions;
+import org.apache.cassandra.exceptions.StartupException;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.SchemaKeyspace;
+import org.apache.cassandra.utils.Clock;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+
+import static java.lang.String.format;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.stream.Collectors.joining;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
+import static 
org.apache.cassandra.exceptions.StartupException.ERR_WRONG_DISK_STATE;
+import static 
org.apache.cassandra.exceptions.StartupException.ERR_WRONG_MACHINE_STATE;
+import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
+
+public class DataResurrectionCheck implements StartupCheck
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(DataResurrectionCheck.class);
+
+    public static final String HEARTBEAT_FILE_CONFIG_PROPERTY = 
"heartbeat_file";
+    public static final String EXCLUDED_KEYSPACES_CONFIG_PROPERTY = 
"excluded_keyspaces";
+    public static final String EXCLUDED_TABLES_CONFIG_PROPERTY = 
"excluded_tables";
+
+    public static final String DEFAULT_HEARTBEAT_FILE = "cassandra-heartbeat";
+
+    @JsonIgnoreProperties(ignoreUnknown = true)
+    public static class Heartbeat
+    {
+        @JsonProperty("last_heartbeat")
+        public final Instant lastHeartbeat;
+
+        /** needed for jackson serialization */
+        @SuppressWarnings("unused")
+        private Heartbeat() {
+            this.lastHeartbeat = null;
+        }
+
+        public Heartbeat(Instant lastHeartbeat)
+        {
+            this.lastHeartbeat = lastHeartbeat;
+        }
+
+        public void serializeToJsonFile(File outputFile) throws IOException
+        {
+            FBUtilities.serializeToJsonFile(this, outputFile);
+        }
+
+        public static Heartbeat deserializeFromJsonFile(File file) throws 
IOException
+        {
+            return FBUtilities.deserializeFromJsonFile(Heartbeat.class, file);
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            Heartbeat manifest = (Heartbeat) o;
+            return Objects.equals(lastHeartbeat, manifest.lastHeartbeat);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return Objects.hash(lastHeartbeat);
+        }
+    }
+
+    @VisibleForTesting
+    static class TableGCPeriod
+    {
+        String table;
+        int gcPeriod;
+
+        TableGCPeriod(String table, int gcPeriod)
+        {
+            this.table = table;
+            this.gcPeriod = gcPeriod;
+        }
+    }
+
+    static File getHeartbeatFile(Map<String, Object> config)
+    {
+        String heartbeatFileConfigValue = (String) 
config.get(HEARTBEAT_FILE_CONFIG_PROPERTY);
+        File heartbeatFile;
+
+        if (heartbeatFileConfigValue != null)
+        {
+            heartbeatFile = new File(heartbeatFileConfigValue);
+        }
+        else
+        {
+            String[] dataFileLocations = 
DatabaseDescriptor.getLocalSystemKeyspacesDataFileLocations();
+            assert dataFileLocations.length != 0;
+            heartbeatFile = new File(dataFileLocations[0], 
DEFAULT_HEARTBEAT_FILE);
+        }
+
+        LOGGER.trace("Resolved heartbeat file for data resurrection check: " + 
heartbeatFile);
+
+        return heartbeatFile;
+    }
+
+    @Override
+    public StartupChecks.StartupCheckType getStartupCheckType()
+    {
+        return StartupChecks.StartupCheckType.check_data_resurrection;
+    }
+
+    @Override
+    public void execute(StartupChecksOptions options) throws StartupException
+    {
+        if (options.isDisabled(getStartupCheckType()))
+            return;
+
+        Map<String, Object> config = 
options.getConfig(StartupChecks.StartupCheckType.check_data_resurrection);
+        File heartbeatFile = getHeartbeatFile(config);
+
+        if (!heartbeatFile.exists())
+        {
+            LOGGER.debug("Heartbeat file {} not found! Skipping heartbeat 
startup check.", heartbeatFile.absolutePath());
+            return;
+        }
+
+        Heartbeat heartbeat;
+
+        try
+        {
+            heartbeat = Heartbeat.deserializeFromJsonFile(heartbeatFile);
+        }
+        catch (IOException ex)
+        {
+            throw new StartupException(ERR_WRONG_DISK_STATE, "Failed to 
deserialize heartbeat file " + heartbeatFile);
+        }
+
+        if (heartbeat.lastHeartbeat == null)
+            return;
+
+        long heartbeatMillis = heartbeat.lastHeartbeat.toEpochMilli();
+
+        List<Pair<String, String>> violations = new ArrayList<>();
+
+        Set<String> excludedKeyspaces = getExcludedKeyspaces(config);
+        Set<Pair<String, String>> excludedTables = getExcludedTables(config);
+
+        long currentTimeMillis = currentTimeMillis();
+
+        for (String keyspace : getKeyspaces())
+        {
+            if (excludedKeyspaces.contains(keyspace))
+                continue;
+
+            for (TableGCPeriod userTable : getTablesGcPeriods(keyspace))
+            {
+                if (excludedTables.contains(Pair.create(keyspace, 
userTable.table)))
+                    continue;
+
+                long gcGraceMillis = ((long) userTable.gcPeriod) * 1000;
+                if (heartbeatMillis + gcGraceMillis < currentTimeMillis)
+                    violations.add(Pair.create(keyspace, userTable.table));
+            }
+        }
+
+        if (!violations.isEmpty())
+        {
+            String invalidTables = violations.stream()
+                                             .map(p -> format("%s.%s", p.left, 
p.right))
+                                             .collect(joining(","));
+
+            String exceptionMessage = format("There are tables for which 
gc_grace_seconds is older " +
+                                             "than the lastly known time 
Cassandra node was up based " +
+                                             "on its heartbeat %s with 
timestamp %s. Cassandra node will not start " +
+                                             "as it would likely introduce 
data consistency " +
+                                             "issues (zombies etc). Please 
resolve these issues manually, " +
+                                             "then remove the heartbeat and 
start the node again. Invalid tables: %s",
+                                             heartbeatFile, 
heartbeat.lastHeartbeat, invalidTables);
+
+            throw new StartupException(ERR_WRONG_MACHINE_STATE, 
exceptionMessage);
+        }
+    }
+
+    @Override
+    public void postAction(StartupChecksOptions options)
+    {
+        // Schedule heartbeating after all checks have passed, not as part of 
the check,
+        // as it might happen that other checks after it might fail, but we 
would be heartbeating already.
+        if 
(options.isEnabled(StartupChecks.StartupCheckType.check_data_resurrection))
+        {
+            Map<String, Object> config = 
options.getConfig(StartupChecks.StartupCheckType.check_data_resurrection);
+            File heartbeatFile = 
DataResurrectionCheck.getHeartbeatFile(config);
+
+            ScheduledExecutors.scheduledTasks.scheduleAtFixedRate(() ->
+            {
+                Heartbeat heartbeat = new 
Heartbeat(Instant.ofEpochMilli(Clock.Global.currentTimeMillis()));
+                try
+                {
+                    heartbeatFile.parent().createDirectoriesIfNotExists();
+                    DataResurrectionCheck.LOGGER.trace("writing heartbeat to 
file " + heartbeatFile);
+                    heartbeat.serializeToJsonFile(heartbeatFile);
+                }
+                catch (IOException ex)
+                {
+                    DataResurrectionCheck.LOGGER.error("Unable to serialize 
heartbeat to " + heartbeatFile, ex);
+                }
+            }, 0, 
CassandraRelevantProperties.CHECK_DATA_RESURRECTION_HEARTBEAT_PERIOD.getInt(), 
MILLISECONDS);
+        }
+    }
+
+    @VisibleForTesting
+    public Set<String> getExcludedKeyspaces(Map<String, Object> config)
+    {
+        String excludedKeyspacesConfigValue = (String) 
config.get(EXCLUDED_KEYSPACES_CONFIG_PROPERTY);
+
+        if (excludedKeyspacesConfigValue == null)
+            return Collections.emptySet();
+        else
+            return 
Arrays.stream(excludedKeyspacesConfigValue.trim().split(","))
+                         .map(String::trim)
+                         .collect(toSet());
+    }
+
+    @VisibleForTesting
+    public Set<Pair<String, String>> getExcludedTables(Map<String, Object> 
config)
+    {
+        String excludedKeyspacesConfigValue = (String) 
config.get(EXCLUDED_TABLES_CONFIG_PROPERTY);
+
+        if (excludedKeyspacesConfigValue == null)
+            return Collections.emptySet();
+
+        Set<Pair<String, String>> pairs = new HashSet<>();
+
+        for (String keyspaceTable : 
excludedKeyspacesConfigValue.trim().split(","))
+        {
+            String[] pair = keyspaceTable.trim().split("\\.");
+            if (pair.length != 2)
+                continue;
+
+            pairs.add(Pair.create(pair[0].trim(), pair[1].trim()));
+        }
+
+        return pairs;
+    }
+
+    @VisibleForTesting
+    List<String> getKeyspaces()
+    {
+        return SchemaKeyspace.fetchNonSystemKeyspaces()
+                             .stream()
+                             .map(keyspaceMetadata -> keyspaceMetadata.name)
+                             .collect(toList());
+    }
+
+    @VisibleForTesting
+    List<TableGCPeriod> getTablesGcPeriods(String userKeyspace)
+    {
+        Optional<KeyspaceMetadata> keyspaceMetadata = 
SchemaKeyspace.fetchNonSystemKeyspaces().get(userKeyspace);
+        if (!keyspaceMetadata.isPresent())
+            return Collections.emptyList();
+
+        KeyspaceMetadata ksmd = keyspaceMetadata.get();
+        return ksmd.tables.stream()
+                          .filter(tmd -> tmd.params.gcGraceSeconds > 0)
+                          .map(tmd -> new TableGCPeriod(tmd.name, 
tmd.params.gcGraceSeconds)).collect(toList());
+    }
+}
diff --git 
a/src/java/org/apache/cassandra/service/FileSystemOwnershipCheck.java 
b/src/java/org/apache/cassandra/service/FileSystemOwnershipCheck.java
index 0d214e6d07..04d87c917b 100644
--- a/src/java/org/apache/cassandra/service/FileSystemOwnershipCheck.java
+++ b/src/java/org/apache/cassandra/service/FileSystemOwnershipCheck.java
@@ -39,7 +39,7 @@ import org.apache.cassandra.config.StartupChecksOptions;
 import org.apache.cassandra.exceptions.StartupException;
 import org.apache.cassandra.io.util.File;
 
-import static 
org.apache.cassandra.service.StartupChecks.StartupCheckType.filesystem_ownership;
+import static 
org.apache.cassandra.service.StartupChecks.StartupCheckType.check_filesystem_ownership;
 
 /**
  * Ownership markers on disk are compatible with the java property file format.
@@ -115,7 +115,7 @@ public class FileSystemOwnershipCheck implements 
StartupCheck
     @Override
     public StartupChecks.StartupCheckType getStartupCheckType()
     {
-        return filesystem_ownership;
+        return check_filesystem_ownership;
     }
 
     @Override
diff --git a/src/java/org/apache/cassandra/service/StartupCheck.java 
b/src/java/org/apache/cassandra/service/StartupCheck.java
index 567120ea65..331b381d2f 100644
--- a/src/java/org/apache/cassandra/service/StartupCheck.java
+++ b/src/java/org/apache/cassandra/service/StartupCheck.java
@@ -55,4 +55,13 @@ public interface StartupCheck
     {
         return StartupCheckType.non_configurable_check;
     }
+
+    /**
+     * Post-hook after all startup checks succeeded.
+     *
+     * @param options startup check options from descriptor
+     */
+    default void postAction(StartupChecksOptions options)
+    {
+    }
 }
diff --git a/src/java/org/apache/cassandra/service/StartupChecks.java 
b/src/java/org/apache/cassandra/service/StartupChecks.java
index f66bc19b65..93a1d6ca3f 100644
--- a/src/java/org/apache/cassandra/service/StartupChecks.java
+++ b/src/java/org/apache/cassandra/service/StartupChecks.java
@@ -104,9 +104,10 @@ public class StartupChecks
     {
         // non-configurable check is always enabled for execution
         non_configurable_check,
-        filesystem_ownership(true),
-        dc,
-        rack;
+        check_filesystem_ownership(true),
+        check_dc,
+        check_rack,
+        check_data_resurrection(true);
 
         public final boolean disabledByDefault;
 
@@ -143,7 +144,8 @@ public class StartupChecks
                                                                       
checkSystemKeyspaceState,
                                                                       
checkDatacenter,
                                                                       
checkRack,
-                                                                      
checkLegacyAuthTables);
+                                                                      
checkLegacyAuthTables,
+                                                                      new 
DataResurrectionCheck());
 
     public StartupChecks withDefaultTests()
     {
@@ -171,6 +173,18 @@ public class StartupChecks
     {
         for (StartupCheck test : preFlightChecks)
             test.execute(options);
+
+        for (StartupCheck test : preFlightChecks)
+        {
+            try
+            {
+                test.postAction(options);
+            }
+            catch (Throwable t)
+            {
+                logger.warn("Failed to run startup check post-action on " + 
test.getStartupCheckType());
+            }
+        }
     }
 
     public static final StartupCheck checkJemalloc = new StartupCheck()
@@ -656,7 +670,7 @@ public class StartupChecks
         @Override
         public StartupCheckType getStartupCheckType()
         {
-            return StartupCheckType.dc;
+            return StartupCheckType.check_dc;
         }
     };
 
@@ -693,7 +707,7 @@ public class StartupChecks
         @Override
         public StartupCheckType getStartupCheckType()
         {
-            return StartupCheckType.rack;
+            return StartupCheckType.check_rack;
         }
     };
 
diff --git 
a/src/java/org/apache/cassandra/service/snapshot/SnapshotManifest.java 
b/src/java/org/apache/cassandra/service/snapshot/SnapshotManifest.java
index 0a301fa5e9..c5e696e73b 100644
--- a/src/java/org/apache/cassandra/service/snapshot/SnapshotManifest.java
+++ b/src/java/org/apache/cassandra/service/snapshot/SnapshotManifest.java
@@ -24,29 +24,19 @@ import java.util.List;
 import java.util.Objects;
 
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
-import com.fasterxml.jackson.databind.DeserializationFeature;
 import org.apache.cassandra.config.DurationSpec;
 import org.apache.cassandra.io.util.File;
-import org.apache.cassandra.io.util.FileInputStreamPlus;
-import org.apache.cassandra.io.util.FileOutputStreamPlus;
-
-import static org.apache.cassandra.io.util.File.WriteMode.OVERWRITE;
+import org.apache.cassandra.utils.FBUtilities;
 
 // Only serialize fields
 @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY,
                 getterVisibility = JsonAutoDetect.Visibility.NONE,
                 setterVisibility = JsonAutoDetect.Visibility.NONE)
+@JsonIgnoreProperties(ignoreUnknown = true)
 public class SnapshotManifest
 {
-    private static final ObjectMapper mapper = new ObjectMapper();
-    static {
-        mapper.registerModule(new JavaTimeModule());
-        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
-    }
-
     @JsonProperty("files")
     public final List<String> files;
 
@@ -88,18 +78,12 @@ public class SnapshotManifest
 
     public void serializeToJsonFile(File outputFile) throws IOException
     {
-        try (FileOutputStreamPlus out = outputFile.newOutputStream(OVERWRITE))
-        {
-            mapper.writeValue((OutputStream) out, this);
-        }
+        FBUtilities.serializeToJsonFile(this, outputFile);
     }
 
     public static SnapshotManifest deserializeFromJsonFile(File file) throws 
IOException
     {
-        try (FileInputStreamPlus in = file.newInputStream())
-        {
-            return mapper.readValue((InputStream) in, SnapshotManifest.class);
-        }
+        return FBUtilities.deserializeFromJsonFile(SnapshotManifest.class, 
file);
     }
 
     @Override
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java 
b/src/java/org/apache/cassandra/utils/FBUtilities.java
index 4fdbc9b969..6d210ceb47 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -46,7 +46,11 @@ import javax.annotation.Nullable;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
 import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.io.util.FileInputStreamPlus;
+import org.apache.cassandra.io.util.FileOutputStreamPlus;
 import org.apache.cassandra.utils.concurrent.*;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
@@ -83,21 +87,23 @@ import 
org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
 
 import static 
org.apache.cassandra.config.CassandraRelevantProperties.LINE_SEPARATOR;
 import static 
org.apache.cassandra.config.CassandraRelevantProperties.USER_HOME;
+import static org.apache.cassandra.io.util.File.WriteMode.OVERWRITE;
 import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
 import static org.apache.cassandra.utils.Clock.Global.nanoTime;
 
 
 public class FBUtilities
 {
+    private static final ObjectMapper jsonMapper = new ObjectMapper(new 
JsonFactory());
+
     static
     {
         preventIllegalAccessWarnings();
+        jsonMapper.registerModule(new JavaTimeModule());
+        jsonMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
     }
 
     private static final Logger logger = 
LoggerFactory.getLogger(FBUtilities.class);
-
-    private static final ObjectMapper jsonMapper = new ObjectMapper(new 
JsonFactory());
-
     public static final String UNKNOWN_RELEASE_VERSION = "Unknown";
 
     public static final BigInteger TWO = new BigInteger("2");
@@ -837,6 +843,22 @@ public class FBUtilities
         }
     }
 
+    public static void serializeToJsonFile(Object object, File outputFile) 
throws IOException
+    {
+        try (FileOutputStreamPlus out = outputFile.newOutputStream(OVERWRITE))
+        {
+            jsonMapper.writeValue((OutputStream) out, object);
+        }
+    }
+
+    public static <T> T deserializeFromJsonFile(Class<T> tClass, File file) 
throws IOException
+    {
+        try (FileInputStreamPlus in = file.newInputStream())
+        {
+            return jsonMapper.readValue((InputStream) in, tClass);
+        }
+    }
+
     public static String prettyPrintMemory(long size)
     {
         return prettyPrintMemory(size, false);
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/DataResurrectionCheckTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/DataResurrectionCheckTest.java
new file mode 100644
index 0000000000..1b297da46f
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/DataResurrectionCheckTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.junit.Test;
+
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.config.StartupChecksOptions;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.IIsolatedExecutor;
+import org.apache.cassandra.exceptions.StartupException;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.service.DataResurrectionCheck;
+import org.apache.cassandra.service.DataResurrectionCheck.Heartbeat;
+import org.apache.cassandra.service.StartupChecks.StartupCheckType;
+import org.apache.cassandra.utils.Clock.Global;
+
+import static java.lang.String.format;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.cassandra.config.StartupChecksOptions.ENABLED_PROPERTY;
+import static org.apache.cassandra.distributed.Cluster.build;
+import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
+import static 
org.apache.cassandra.service.DataResurrectionCheck.DEFAULT_HEARTBEAT_FILE;
+import static 
org.apache.cassandra.service.DataResurrectionCheck.EXCLUDED_KEYSPACES_CONFIG_PROPERTY;
+import static 
org.apache.cassandra.service.DataResurrectionCheck.EXCLUDED_TABLES_CONFIG_PROPERTY;
+import static 
org.apache.cassandra.service.StartupChecks.StartupCheckType.check_data_resurrection;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.containsString;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+public class DataResurrectionCheckTest extends TestBaseImpl
+{
+    @Test
+    public void testDataResurrectionCheck() throws Exception
+    {
+        try
+        {
+            // set it to 1 hour so check will be not updated after it is 
written, for test purposes
+            
System.setProperty(CassandraRelevantProperties.CHECK_DATA_RESURRECTION_HEARTBEAT_PERIOD.getKey(),
 "3600000");
+
+            // start the node with the check enabled, it will just pass fine 
as there are not any user tables yet
+            // and system tables are young enough
+            try (Cluster cluster = build().withNodes(1)
+                                          .withDataDirCount(3) // we will 
expect heartbeat to be in the first data dir
+                                          .withConfig(config -> 
config.with(NATIVE_PROTOCOL)
+                                                                      
.set("startup_checks",
+                                                                           
getStartupChecksConfig(ENABLED_PROPERTY, "true")))
+                                          .start())
+            {
+                IInvokableInstance instance = cluster.get(1);
+
+                checkHeartbeat(instance);
+
+                for (String ks : new String[]{ "ks1", "ks2", "ks3" })
+                {
+                    cluster.schemaChange("CREATE KEYSPACE " + ks + " WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor': 1};");
+                    cluster.schemaChange(format("CREATE TABLE %s.tb1 (pk text 
PRIMARY KEY) WITH gc_grace_seconds = 10", ks));
+                    cluster.schemaChange(format("CREATE TABLE %s.tb2 (pk text 
PRIMARY KEY)", ks));
+                }
+
+                AtomicReference<Throwable> throwable = new AtomicReference<>();
+                // periodically execute check on a running instance and wait 
until the exception is thrown on all keyspaces
+                // wait for all violations by Awaitility as due to nature how 
tables were created,
+                // they will not expire on their gc_grace_period exactly at 
the same time
+                await().timeout(1, MINUTES)
+                       .pollInterval(5, SECONDS)
+                       .until(() -> {
+                           Throwable t = executeChecksOnInstance(instance);
+                           if (t == null)
+                               return false;
+                           String message = t.getMessage();
+                           if (!message.contains("ks1") || 
!message.contains("ks2") || !message.contains("ks3"))
+                           {
+                               return false;
+                           }
+                           throwable.set(t);
+                           return true;
+                       });
+
+                assertThat(throwable.get().getMessage(), 
containsString("Invalid tables"));
+                // returned tables in output are not in any particular order
+                // it is how they are returned from system tables
+                assertThat(throwable.get().getMessage(), 
containsString("ks1.tb1"));
+                assertThat(throwable.get().getMessage(), 
containsString("ks2.tb1"));
+                assertThat(throwable.get().getMessage(), 
containsString("ks3.tb1"));
+
+                // exclude failing keyspaces which already expired on their 
gc_grace_seconds, so we will pass the check
+                assertNull(executeChecksOnInstance(instance, 
EXCLUDED_KEYSPACES_CONFIG_PROPERTY, "ks1,ks2,ks3"));
+
+                // exclude failing tables which already expired on their 
gc_grace_seconds, so we will pass the check
+                assertNull(executeChecksOnInstance(instance, 
EXCLUDED_TABLES_CONFIG_PROPERTY, "ks1.tb1,ks2.tb1,ks3.tb1"));
+
+                // exclude failing tables, but not all of them,
+                // so check detects only one table violates the check
+                Throwable t = executeChecksOnInstance(instance, 
EXCLUDED_TABLES_CONFIG_PROPERTY, "ks1.tb1,ks2.tb1");
+
+                assertNotNull(t);
+                assertThat(t.getMessage(), containsString("Invalid tables: 
ks3.tb1"));
+
+                // shadow table exclusion with keyspace exclusion, we have not 
excluded ks3.tb1, but we excluded whole ks3
+                assertNull(executeChecksOnInstance(instance,
+                                                   
EXCLUDED_TABLES_CONFIG_PROPERTY, "ks1.tb1,ks2.tb1",
+                                                   
EXCLUDED_KEYSPACES_CONFIG_PROPERTY, "ks3"));
+            }
+        }
+        finally
+        {
+            
System.clearProperty(CassandraRelevantProperties.CHECK_DATA_RESURRECTION_HEARTBEAT_PERIOD.getKey());
+        }
+    }
+
+    private Throwable executeChecksOnInstance(IInvokableInstance instance, 
final String... config)
+    {
+        assert config.length % 2 == 0;
+        return 
instance.callsOnInstance((IIsolatedExecutor.SerializableCallable<Throwable>) () 
->
+        {
+            try
+            {
+                DataResurrectionCheck check = new DataResurrectionCheck();
+                StartupChecksOptions startupChecksOptions = new 
StartupChecksOptions();
+                startupChecksOptions.enable(check_data_resurrection);
+
+                for (int i = 0; i < config.length - 1; i = i + 2)
+                    startupChecksOptions.set(check_data_resurrection, 
config[i], config[i + 1]);
+
+                check.execute(startupChecksOptions);
+                return null;
+            }
+            catch (StartupException e)
+            {
+                return e;
+            }
+        }).call();
+    }
+
+    private Map<StartupCheckType, Map<String, Object>> 
getStartupChecksConfig(String... configs)
+    {
+        return new EnumMap<StartupCheckType, Map<String, 
Object>>(StartupCheckType.class)
+        {{
+            put(check_data_resurrection,
+                new HashMap<String, Object>()
+                {{
+                    for (int i = 0; i < configs.length - 1; i = i + 2)
+                        put(configs[i], configs[i + 1]);
+                }});
+        }};
+    }
+
+    private void checkHeartbeat(IInvokableInstance instance) throws Exception
+    {
+        File heartbeatFile = new File(((String[]) 
instance.config().get("data_file_directories"))[0],
+                                      DEFAULT_HEARTBEAT_FILE);
+        assertTrue(heartbeatFile.exists());
+        Heartbeat heartbeat = Heartbeat.deserializeFromJsonFile(heartbeatFile);
+        assertNotNull(heartbeat.lastHeartbeat);
+        assertTrue(heartbeat.lastHeartbeat.toEpochMilli() < 
Global.currentTimeMillis());
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java 
b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
index 011b322cba..1e484404b8 100644
--- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
+++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
@@ -154,6 +154,7 @@ public class DatabaseDescriptorRefTest
     "org.apache.cassandra.io.compress.LZ4Compressor",
     "org.apache.cassandra.io.sstable.metadata.MetadataType",
     "org.apache.cassandra.io.util.BufferedDataOutputStreamPlus",
+    "org.apache.cassandra.io.util.RebufferingInputStream",
     "org.apache.cassandra.io.util.FileInputStreamPlus",
     "org.apache.cassandra.io.util.FileOutputStreamPlus",
     "org.apache.cassandra.io.util.File",
@@ -161,6 +162,7 @@ public class DatabaseDescriptorRefTest
     "org.apache.cassandra.io.util.DataOutputBufferFixed",
     "org.apache.cassandra.io.util.DataOutputStreamPlus",
     "org.apache.cassandra.io.util.DataOutputPlus",
+    "org.apache.cassandra.io.util.DataInputPlus",
     "org.apache.cassandra.io.util.DiskOptimizationStrategy",
     "org.apache.cassandra.io.util.SpinningDiskOptimizationStrategy",
     "org.apache.cassandra.io.util.PathUtils$IOToLongFunction",
diff --git a/test/unit/org/apache/cassandra/config/StartupCheckOptionsTest.java 
b/test/unit/org/apache/cassandra/config/StartupCheckOptionsTest.java
index 8be5f92781..f613c0ecda 100644
--- a/test/unit/org/apache/cassandra/config/StartupCheckOptionsTest.java
+++ b/test/unit/org/apache/cassandra/config/StartupCheckOptionsTest.java
@@ -21,13 +21,16 @@ package org.apache.cassandra.config;
 import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 
 import org.junit.Test;
 
+import org.apache.cassandra.service.DataResurrectionCheck;
 import org.apache.cassandra.service.StartupChecks.StartupCheckType;
+import org.apache.cassandra.utils.Pair;
 
 import static 
org.apache.cassandra.config.StartupChecksOptions.ENABLED_PROPERTY;
-import static 
org.apache.cassandra.service.StartupChecks.StartupCheckType.filesystem_ownership;
+import static 
org.apache.cassandra.service.StartupChecks.StartupCheckType.check_filesystem_ownership;
 import static 
org.apache.cassandra.service.StartupChecks.StartupCheckType.non_configurable_check;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -39,7 +42,7 @@ public class StartupCheckOptionsTest
     public void testStartupOptionsConfigApplication()
     {
         Map<StartupCheckType, Map<String, Object>> config = new 
EnumMap<StartupCheckType, Map<String, Object>>(StartupCheckType.class) {{
-            put(filesystem_ownership, new HashMap<String, Object>() {{
+            put(check_filesystem_ownership, new HashMap<String, Object>() {{
                 put(ENABLED_PROPERTY, true);
                 put("key", "value");
             }});
@@ -47,18 +50,18 @@ public class StartupCheckOptionsTest
 
         StartupChecksOptions options = new StartupChecksOptions(config);
 
-        assertTrue(Boolean.parseBoolean(options.getConfig(filesystem_ownership)
+        
assertTrue(Boolean.parseBoolean(options.getConfig(check_filesystem_ownership)
                                                .get(ENABLED_PROPERTY)
                                                .toString()));
 
-        assertEquals("value", 
options.getConfig(filesystem_ownership).get("key"));
-        options.set(filesystem_ownership, "key", "value2");
-        assertEquals("value2", 
options.getConfig(filesystem_ownership).get("key"));
+        assertEquals("value", 
options.getConfig(check_filesystem_ownership).get("key"));
+        options.set(check_filesystem_ownership, "key", "value2");
+        assertEquals("value2", 
options.getConfig(check_filesystem_ownership).get("key"));
 
-        assertTrue(options.isEnabled(filesystem_ownership));
-        options.disable(filesystem_ownership);
-        assertFalse(options.isEnabled(filesystem_ownership));
-        assertTrue(options.isDisabled(filesystem_ownership));
+        assertTrue(options.isEnabled(check_filesystem_ownership));
+        options.disable(check_filesystem_ownership);
+        assertFalse(options.isEnabled(check_filesystem_ownership));
+        assertTrue(options.isDisabled(check_filesystem_ownership));
     }
 
     @Test
@@ -82,11 +85,11 @@ public class StartupCheckOptionsTest
     public void testEmptyDisabledValues()
     {
         Map<StartupCheckType, Map<String, Object>> emptyConfig = new 
EnumMap<StartupCheckType, Map<String, Object>>(StartupCheckType.class) {{
-            put(filesystem_ownership, new HashMap<>());
+            put(check_filesystem_ownership, new HashMap<>());
         }};
 
         Map<StartupCheckType, Map<String, Object>> emptyEnabledConfig = new 
EnumMap<StartupCheckType, Map<String, Object>>(StartupCheckType.class) {{
-            put(filesystem_ownership, new HashMap<String, Object>() {{
+            put(check_filesystem_ownership, new HashMap<String, Object>() {{
                 put(ENABLED_PROPERTY, null);
             }});
         }};
@@ -94,10 +97,10 @@ public class StartupCheckOptionsTest
         // empty enabled property or enabled property with null value are 
still counted as enabled
 
         StartupChecksOptions options1 = new StartupChecksOptions(emptyConfig);
-        assertTrue(options1.isDisabled(filesystem_ownership));
+        assertTrue(options1.isDisabled(check_filesystem_ownership));
 
         StartupChecksOptions options2 = new 
StartupChecksOptions(emptyEnabledConfig);
-        assertTrue(options2.isDisabled(filesystem_ownership));
+        assertTrue(options2.isDisabled(check_filesystem_ownership));
     }
 
     @Test
@@ -105,6 +108,43 @@ public class StartupCheckOptionsTest
     {
         Map<StartupCheckType, Map<String, Object>> emptyConfig = new 
EnumMap<>(StartupCheckType.class);
         StartupChecksOptions options = new StartupChecksOptions(emptyConfig);
-        assertTrue(options.isDisabled(filesystem_ownership));
+        assertTrue(options.isDisabled(check_filesystem_ownership));
+    }
+
+    @Test
+    public void testExcludedKeyspacesInDataResurrectionCheckOptions()
+    {
+        Map<String, Object> config = new HashMap<String, Object>(){{
+            put("excluded_keyspaces", "ks1,ks2,ks3");
+        }};
+        DataResurrectionCheck check = new DataResurrectionCheck();
+        check.getExcludedKeyspaces(config);
+
+        Set<String> excludedKeyspaces = check.getExcludedKeyspaces(config);
+        assertEquals(3, excludedKeyspaces.size());
+        assertTrue(excludedKeyspaces.contains("ks1"));
+        assertTrue(excludedKeyspaces.contains("ks2"));
+        assertTrue(excludedKeyspaces.contains("ks3"));
+    }
+
+    @Test
+    public void testExcludedTablesInDataResurrectionCheckOptions()
+    {
+        for (String input : new String[]{
+        "ks1.tb1,ks1.tb2,ks3.tb3",
+        " ks1 . tb1,  ks1 .tb2  ,ks3 .tb3  "
+        })
+        {
+            Map<String, Object> config = new HashMap<String, Object>(){{
+                put("excluded_tables", input);
+            }};
+
+            DataResurrectionCheck check = new DataResurrectionCheck();
+            Set<Pair<String, String>> excludedTables = 
check.getExcludedTables(config);
+            assertEquals(3, excludedTables.size());
+            assertTrue(excludedTables.contains(Pair.create("ks1", "tb1")));
+            assertTrue(excludedTables.contains(Pair.create("ks1", "tb2")));
+            assertTrue(excludedTables.contains(Pair.create("ks3", "tb3")));
+        }
     }
 }
diff --git 
a/test/unit/org/apache/cassandra/service/AbstractFilesystemOwnershipCheckTest.java
 
b/test/unit/org/apache/cassandra/service/AbstractFilesystemOwnershipCheckTest.java
index 11efc3c2c3..0fc8559985 100644
--- 
a/test/unit/org/apache/cassandra/service/AbstractFilesystemOwnershipCheckTest.java
+++ 
b/test/unit/org/apache/cassandra/service/AbstractFilesystemOwnershipCheckTest.java
@@ -53,7 +53,7 @@ import static 
org.apache.cassandra.service.FileSystemOwnershipCheck.TOKEN;
 import static 
org.apache.cassandra.service.FileSystemOwnershipCheck.UNSUPPORTED_VERSION;
 import static org.apache.cassandra.service.FileSystemOwnershipCheck.VERSION;
 import static 
org.apache.cassandra.service.FileSystemOwnershipCheck.VOLUME_COUNT;
-import static 
org.apache.cassandra.service.StartupChecks.StartupCheckType.filesystem_ownership;
+import static 
org.apache.cassandra.service.StartupChecks.StartupCheckType.check_filesystem_ownership;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -195,7 +195,7 @@ public abstract class AbstractFilesystemOwnershipCheckTest
     public void skipCheckDisabledIfSystemPropertyIsEmpty() throws Exception
     {
         // no exceptions thrown from the supplier because the check is skipped
-        options.disable(filesystem_ownership);
+        options.disable(check_filesystem_ownership);
         
System.clearProperty(CassandraRelevantProperties.FILE_SYSTEM_CHECK_ENABLE.getKey());
         AbstractFilesystemOwnershipCheckTest.checker(() -> { throw new 
RuntimeException("FAIL"); }).execute(options);
     }
@@ -204,7 +204,7 @@ public abstract class AbstractFilesystemOwnershipCheckTest
     public void skipCheckDisabledIfSystemPropertyIsFalseButOptionsEnabled() 
throws Exception
     {
         // no exceptions thrown from the supplier because the check is skipped
-        options.enable(filesystem_ownership);
+        options.enable(check_filesystem_ownership);
         
System.setProperty(CassandraRelevantProperties.FILE_SYSTEM_CHECK_ENABLE.getKey(),
 "false");
         AbstractFilesystemOwnershipCheckTest.checker(() -> { throw new 
RuntimeException("FAIL"); }).execute(options);
     }
@@ -219,7 +219,7 @@ public abstract class AbstractFilesystemOwnershipCheckTest
     @Test
     public void checkEnabledButClusterPropertyIsUnset()
     {
-        
Assume.assumeFalse(options.getConfig(filesystem_ownership).containsKey("ownership_token"));
+        
Assume.assumeFalse(options.getConfig(check_filesystem_ownership).containsKey("ownership_token"));
         
System.clearProperty(CassandraRelevantProperties.FILE_SYSTEM_CHECK_OWNERSHIP_TOKEN.getKey());
         
AbstractFilesystemOwnershipCheckTest.executeAndFail(AbstractFilesystemOwnershipCheckTest.checker(tempDir),
 options, MISSING_PROPERTY, 
CassandraRelevantProperties.FILE_SYSTEM_CHECK_OWNERSHIP_TOKEN.getKey());
     }
diff --git a/test/unit/org/apache/cassandra/service/StartupChecksTest.java 
b/test/unit/org/apache/cassandra/service/StartupChecksTest.java
index 2c15fdc8aa..4a63fc1eb2 100644
--- a/test/unit/org/apache/cassandra/service/StartupChecksTest.java
+++ b/test/unit/org/apache/cassandra/service/StartupChecksTest.java
@@ -21,6 +21,8 @@ import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.time.Instant;
+import java.util.List;
 
 import org.apache.cassandra.config.StartupChecksOptions;
 import org.apache.cassandra.io.util.File;
@@ -31,9 +33,17 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.exceptions.StartupException;
-import org.apache.cassandra.io.util.FileUtils;
-
-import static org.junit.Assert.assertFalse;
+import org.apache.cassandra.service.DataResurrectionCheck.Heartbeat;
+import org.apache.cassandra.utils.Clock;
+
+import static java.time.Instant.ofEpochMilli;
+import static java.util.Collections.singletonList;
+import static org.apache.cassandra.io.util.FileUtils.createTempFile;
+import static org.apache.cassandra.io.util.FileUtils.write;
+import static 
org.apache.cassandra.service.DataResurrectionCheck.HEARTBEAT_FILE_CONFIG_PROPERTY;
+import static 
org.apache.cassandra.service.StartupChecks.StartupCheckType.check_data_resurrection;
+import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -42,12 +52,14 @@ public class StartupChecksTest
     public static final String INVALID_LEGACY_SSTABLE_ROOT_PROP = 
"invalid-legacy-sstable-root";
     StartupChecks startupChecks;
     Path sstableDir;
+    static File heartbeatFile;
 
     StartupChecksOptions options = new StartupChecksOptions();
 
     @BeforeClass
     public static void setupServer()
     {
+        heartbeatFile = createTempFile("cassandra-heartbeat-", "");
         SchemaLoader.prepareServer();
     }
 
@@ -57,19 +69,29 @@ public class StartupChecksTest
         for (ColumnFamilyStore cfs : 
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStores())
             cfs.clearUnsafe();
         for (File dataDir : 
Directories.getKSChildDirectories(SchemaConstants.SYSTEM_KEYSPACE_NAME))
-            FileUtils.deleteRecursive(dataDir);
+            dataDir.deleteRecursive();
 
         File dataDir = new 
File(DatabaseDescriptor.getAllDataFileLocations()[0]);
         sstableDir = Paths.get(dataDir.absolutePath(), "Keyspace1", 
"Standard1");
         Files.createDirectories(sstableDir);
 
+        options.enable(check_data_resurrection);
+        options.getConfig(check_data_resurrection)
+               .put(HEARTBEAT_FILE_CONFIG_PROPERTY, 
heartbeatFile.absolutePath());
+
         startupChecks = new StartupChecks();
     }
 
     @After
     public void tearDown() throws IOException
     {
-        FileUtils.deleteRecursive(new File(sstableDir));
+        new File(sstableDir).deleteRecursive();
+    }
+
+    @AfterClass
+    public static void tearDownClass()
+    {
+        heartbeatFile.delete();
     }
 
     @Test
@@ -82,13 +104,13 @@ public class StartupChecksTest
         verifyFailure(startupChecks, "Detected unreadable sstables");
 
         // we should ignore invalid sstables in a snapshots directory
-        FileUtils.deleteRecursive(new File(sstableDir));
+        new File(sstableDir).deleteRecursive();
         Path snapshotDir = sstableDir.resolve("snapshots");
         Files.createDirectories(snapshotDir);
         copyInvalidLegacySSTables(snapshotDir); startupChecks.verify(options);
 
         // and in a backups directory
-        FileUtils.deleteRecursive(new File(sstableDir));
+        new File(sstableDir).deleteRecursive();
         Path backupDir = sstableDir.resolve("backups");
         Files.createDirectories(backupDir);
         copyInvalidLegacySSTables(backupDir);
@@ -101,7 +123,7 @@ public class StartupChecksTest
         startupChecks = 
startupChecks.withTest(StartupChecks.checkSSTablesFormat);
 
         copyLegacyNonSSTableFiles(sstableDir);
-        assertFalse(new File(sstableDir).tryList().length == 0);
+        assertNotEquals(0, new File(sstableDir).tryList().length);
 
         startupChecks.verify(options);
     }
@@ -150,6 +172,33 @@ public class StartupChecksTest
             Files.copy(Paths.get(legacySSTableRoot.toString(), filename), 
targetDir.resolve(filename));
     }
 
+    @Test
+    public void testDataResurrectionCheck() throws Exception
+    {
+        DataResurrectionCheck check = new DataResurrectionCheck() {
+            @Override
+            List<String> getKeyspaces()
+            {
+                return singletonList("abc");
+            }
+
+            @Override
+            List<TableGCPeriod> getTablesGcPeriods(String userKeyspace)
+            {
+                return singletonList(new TableGCPeriod("def", 10));
+            }
+        };
+
+        Heartbeat heartbeat = new 
Heartbeat(Instant.ofEpochMilli(Clock.Global.currentTimeMillis()));
+        heartbeat.serializeToJsonFile(heartbeatFile);
+
+        Thread.sleep(15 * 1000);
+
+        startupChecks.withTest(check);
+
+        verifyFailure(startupChecks, "Invalid tables: abc.def");
+    }
+
     private void copyInvalidLegacySSTables(Path targetDir) throws IOException
     {
         File legacySSTableRoot = new 
File(Paths.get(System.getProperty(INVALID_LEGACY_SSTABLE_ROOT_PROP),
diff --git 
a/test/unit/org/apache/cassandra/service/YamlBasedFileSystemOwnershipCheckTest.java
 
b/test/unit/org/apache/cassandra/service/YamlBasedFileSystemOwnershipCheckTest.java
index cc00fd6b11..9b962e9225 100644
--- 
a/test/unit/org/apache/cassandra/service/YamlBasedFileSystemOwnershipCheckTest.java
+++ 
b/test/unit/org/apache/cassandra/service/YamlBasedFileSystemOwnershipCheckTest.java
@@ -21,7 +21,7 @@ package org.apache.cassandra.service;
 import org.junit.Before;
 
 import static 
org.apache.cassandra.config.StartupChecksOptions.ENABLED_PROPERTY;
-import static 
org.apache.cassandra.service.StartupChecks.StartupCheckType.filesystem_ownership;
+import static 
org.apache.cassandra.service.StartupChecks.StartupCheckType.check_filesystem_ownership;
 
 public class YamlBasedFileSystemOwnershipCheckTest extends 
AbstractFilesystemOwnershipCheckTest
 {
@@ -29,7 +29,7 @@ public class YamlBasedFileSystemOwnershipCheckTest extends 
AbstractFilesystemOwn
     public void setup()
     {
         super.setup();
-        options.getConfig(filesystem_ownership).put(ENABLED_PROPERTY, "true");
-        options.getConfig(filesystem_ownership).put("ownership_token", token);
+        options.getConfig(check_filesystem_ownership).put(ENABLED_PROPERTY, 
"true");
+        options.getConfig(check_filesystem_ownership).put("ownership_token", 
token);
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to