Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 19c00f721 -> 1ecccede7


Handle corrupt files on startup

Patch by stefania; reviewed by marcuse for CASSANDRA-9686


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1ecccede
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1ecccede
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1ecccede

Branch: refs/heads/cassandra-2.1
Commit: 1ecccede7d9252dd4eacdc31da6b5120d0fded9c
Parents: 19c00f7
Author: Stefania Alborghetti <stefania.alborghe...@datastax.com>
Authored: Wed Jul 1 11:15:15 2015 +0800
Committer: Marcus Eriksson <marc...@apache.org>
Committed: Mon Jul 13 19:03:37 2015 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 conf/cassandra.yaml                             |   9 +-
 .../cassandra/io/sstable/SSTableReader.java     | 137 +++++++++++++------
 .../org/apache/cassandra/io/util/FileUtils.java |  22 +++
 .../cassandra/service/CassandraDaemon.java      |   9 +-
 .../cassandra/service/StorageService.java       |   7 +
 .../cassandra/utils/JVMStabilityInspector.java  |  17 ++-
 7 files changed, 150 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1ecccede/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7635227..5f4fdf2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.9
+ * Handle corrupt files on startup (CASSANDRA-9686)
  * Fix clientutil jar and tests (CASSANDRA-9760)
  * (cqlsh) Allow the SSL protocol version to be specified through the
    config file or environment variables (CASSANDRA-9544)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1ecccede/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index e4b7cbd..3047586 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -112,11 +112,12 @@ partitioner: org.apache.cassandra.dht.Murmur3Partitioner
 # commitlog_directory: /var/lib/cassandra/commitlog
 
 # policy for data disk failures:
-# die: shut down gossip and Thrift and kill the JVM for any fs errors or
+# die: shut down gossip and client transports and kill the JVM for any fs 
errors or
 #      single-sstable errors, so the node can be replaced.
-# stop_paranoid: shut down gossip and Thrift even for single-sstable errors.
-# stop: shut down gossip and Thrift, leaving the node effectively dead, but
-#       can still be inspected via JMX.
+# stop_paranoid: shut down gossip and client transports even for 
single-sstable errors,
+#                kill the JVM for errors during startup.
+# stop: shut down gossip and client transports, leaving the node effectively 
dead, but
+#       can still be inspected via JMX, kill the JVM for errors during startup.
 # best_effort: stop using the failed disk and respond to requests based on
 #              remaining available sstables.  This means you WILL see obsolete
 #              data at CL.ONE!

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1ecccede/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 6879834..92c9b55 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -69,6 +69,7 @@ import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.LocalPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.FSError;
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.io.sstable.metadata.CompactionMetadata;
 import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
@@ -477,19 +478,27 @@ public class SSTableReader extends SSTable implements 
SelfRefCounted<SSTableRead
                                                   statsMetadata,
                                                   OpenReason.NORMAL);
 
-        // load index and filter
-        long start = System.nanoTime();
-        sstable.load(validationMetadata);
-        logger.debug("INDEX LOAD TIME for {}: {} ms.", descriptor, 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
+        try
+        {
+            // load index and filter
+            long start = System.nanoTime();
+            sstable.load(validationMetadata);
+            logger.debug("INDEX LOAD TIME for {}: {} ms.", descriptor, 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
 
-        sstable.setup(!validate);
-        if (validate)
-            sstable.validate();
+            sstable.setup(!validate);
+            if (validate)
+                sstable.validate();
 
-        if (sstable.getKeyCache() != null)
-            logger.debug("key cache contains {}/{} keys", 
sstable.getKeyCache().size(), sstable.getKeyCache().getCapacity());
+            if (sstable.getKeyCache() != null)
+                logger.debug("key cache contains {}/{} keys", 
sstable.getKeyCache().size(), sstable.getKeyCache().getCapacity());
 
-        return sstable;
+            return sstable;
+        }
+        catch (Throwable t)
+        {
+            sstable.selfRef().release();
+            throw t;
+        }
     }
 
     public static void logOpenException(Descriptor descriptor, IOException e)
@@ -518,9 +527,21 @@ public class SSTableReader extends SSTable implements 
SelfRefCounted<SSTableRead
                     {
                         sstable = open(entry.getKey(), entry.getValue(), 
metadata, partitioner);
                     }
+                    catch (CorruptSSTableException ex)
+                    {
+                        FileUtils.handleCorruptSSTable(ex);
+                        logger.error("Corrupt sstable {}; skipping table", 
entry, ex);
+                        return;
+                    }
+                    catch (FSError ex)
+                    {
+                        FileUtils.handleFSError(ex);
+                        logger.error("Cannot read sstable {}; file system 
error, skipping table", entry, ex);
+                        return;
+                    }
                     catch (IOException ex)
                     {
-                        logger.error("Corrupt sstable {}; skipped", entry, ex);
+                        logger.error("Cannot read sstable {}; other IO error, 
skipping table", entry, ex);
                         return;
                     }
                     sstables.add(sstable);
@@ -704,46 +725,71 @@ public class SSTableReader extends SSTable implements 
SelfRefCounted<SSTableRead
      */
     private void load(boolean recreateBloomFilter, boolean 
saveSummaryIfCreated) throws IOException
     {
-        SegmentedFile.Builder ibuilder = 
SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
-        SegmentedFile.Builder dbuilder = compression
+        try
+        {
+            SegmentedFile.Builder ibuilder = 
SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
+            SegmentedFile.Builder dbuilder = compression
                                          ? SegmentedFile.getCompressedBuilder()
                                          : 
SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
 
-        boolean summaryLoaded = loadSummary(ibuilder, dbuilder);
-        boolean builtSummary = false;
-        if (recreateBloomFilter || !summaryLoaded)
-        {
-            buildSummary(recreateBloomFilter, ibuilder, dbuilder, 
summaryLoaded, Downsampling.BASE_SAMPLING_LEVEL);
-            builtSummary = true;
-        }
-
-        if (components.contains(Component.PRIMARY_INDEX))
-            ifile = 
ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
-
-        dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
+            boolean summaryLoaded = loadSummary(ibuilder, dbuilder);
+            boolean builtSummary = false;
+            if (recreateBloomFilter || !summaryLoaded)
+            {
+                buildSummary(recreateBloomFilter, ibuilder, dbuilder, 
summaryLoaded, Downsampling.BASE_SAMPLING_LEVEL);
+                builtSummary = true;
+            }
 
-        // Check for an index summary that was downsampled even though the 
serialization format doesn't support
-        // that.  If it was downsampled, rebuild it.  See CASSANDRA-8993 for 
details.
-        if (!descriptor.version.hasSamplingLevel && !builtSummary && 
!validateSummarySamplingLevel() && ifile != null)
-        {
-            indexSummary.close();
-            ifile.close();
-            dfile.close();
+            if (components.contains(Component.PRIMARY_INDEX))
+                ifile = 
ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
 
-            logger.info("Detected erroneously downsampled index summary; will 
rebuild summary at full sampling");
-            FileUtils.deleteWithConfirm(new 
File(descriptor.filenameFor(Component.SUMMARY)));
-            ibuilder = 
SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
-            dbuilder = compression
-                       ? SegmentedFile.getCompressedBuilder()
-                       : 
SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
-            buildSummary(false, ibuilder, dbuilder, false, 
Downsampling.BASE_SAMPLING_LEVEL);
-            ifile = 
ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
             dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
-            saveSummary(ibuilder, dbuilder);
+
+            // Check for an index summary that was downsampled even though the 
serialization format doesn't support
+            // that.  If it was downsampled, rebuild it.  See CASSANDRA-8993 
for details.
+            if (!descriptor.version.hasSamplingLevel && !builtSummary && 
!validateSummarySamplingLevel() && ifile != null)
+            {
+                indexSummary.close();
+                ifile.close();
+                dfile.close();
+
+                logger.info("Detected erroneously downsampled index summary; 
will rebuild summary at full sampling");
+                FileUtils.deleteWithConfirm(new 
File(descriptor.filenameFor(Component.SUMMARY)));
+                ibuilder = 
SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
+                dbuilder = compression
+                           ? SegmentedFile.getCompressedBuilder()
+                           : 
SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
+                buildSummary(false, ibuilder, dbuilder, false, 
Downsampling.BASE_SAMPLING_LEVEL);
+                ifile = 
ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
+                dfile = 
dbuilder.complete(descriptor.filenameFor(Component.DATA));
+                saveSummary(ibuilder, dbuilder);
+            }
+            else if (saveSummaryIfCreated && builtSummary)
+            {
+                saveSummary(ibuilder, dbuilder);
+            }
         }
-        else if (saveSummaryIfCreated && builtSummary)
-        {
-            saveSummary(ibuilder, dbuilder);
+        catch (Throwable t)
+        { // Because the tidier has not been set-up yet in 
SSTableReader.open(), we must release the files in case of error
+            if (ifile != null)
+            {
+                ifile.close();
+                ifile = null;
+            }
+
+            if (dfile != null)
+            {
+                dfile.close();
+                dfile = null;
+            }
+
+            if (indexSummary != null)
+            {
+                indexSummary.close();
+                indexSummary = null;
+            }
+
+            throw t;
         }
     }
 
@@ -2174,7 +2220,8 @@ public class SSTableReader extends SSTable implements 
SelfRefCounted<SSTableRead
                         summary.close();
                     if (runOnClose != null)
                         runOnClose.run();
-                    dfile.close();
+                    if (dfile != null)
+                        dfile.close();
                     if (ifile != null)
                         ifile.close();
                     typeRef.release();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1ecccede/src/java/org/apache/cassandra/io/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java 
b/src/java/org/apache/cassandra/io/util/FileUtils.java
index 7d187ac..3be7c99 100644
--- a/src/java/org/apache/cassandra/io/util/FileUtils.java
+++ b/src/java/org/apache/cassandra/io/util/FileUtils.java
@@ -395,6 +395,9 @@ public class FileUtils
 
     public static void handleCorruptSSTable(CorruptSSTableException e)
     {
+        if (!StorageService.instance.isSetupCompleted())
+            handleStartupFSError(e);
+
         JVMStabilityInspector.inspectThrowable(e);
         switch (DatabaseDescriptor.getDiskFailurePolicy())
         {
@@ -406,6 +409,9 @@ public class FileUtils
     
     public static void handleFSError(FSError e)
     {
+        if (!StorageService.instance.isSetupCompleted())
+            handleStartupFSError(e);
+
         JVMStabilityInspector.inspectThrowable(e);
         switch (DatabaseDescriptor.getDiskFailurePolicy())
         {
@@ -431,6 +437,22 @@ public class FileUtils
         }
     }
 
+    private static void handleStartupFSError(Throwable t)
+    {
+        switch (DatabaseDescriptor.getDiskFailurePolicy())
+        {
+            case stop_paranoid:
+            case stop:
+            case die:
+                logger.error("Exiting forcefully due to file system exception 
on startup, disk failure policy \"{}\"",
+                             DatabaseDescriptor.getDiskFailurePolicy(),
+                             t);
+                JVMStabilityInspector.killCurrentJVM(t, true);
+                break;
+            default:
+                break;
+        }
+    }
     /**
      * Get the size of a directory in bytes
      * @param directory The directory for which we need size.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1ecccede/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java 
b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index f66e523..949ea4c 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -134,7 +134,7 @@ public class CassandraDaemon
 
     public Server thriftServer;
     public Server nativeServer;
-
+    private boolean setupCompleted = false;
     /**
      * This is a hook for concrete daemons to initialize themselves suitably.
      *
@@ -425,6 +425,13 @@ public class CassandraDaemon
         InetAddress nativeAddr = DatabaseDescriptor.getRpcAddress();
         int nativePort = DatabaseDescriptor.getNativeTransportPort();
         nativeServer = new org.apache.cassandra.transport.Server(nativeAddr, 
nativePort);
+
+        setupCompleted = true;
+    }
+
+    public boolean setupCompleted()
+    {
+        return setupCompleted;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1ecccede/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index e4ffc5b..4a32bad 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -486,6 +486,13 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         return initialized;
     }
 
+    public boolean isSetupCompleted()
+    {
+        return daemon == null
+               ? false
+               : daemon.setupCompleted();
+    }
+
     public void stopDaemon()
     {
         if (daemon == null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1ecccede/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java 
b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
index 2883ab3..c0ab84f 100644
--- a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
+++ b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
@@ -72,6 +72,11 @@ public final class JVMStabilityInspector
             inspectThrowable(t);
     }
 
+    public static void killCurrentJVM(Throwable t, boolean quiet)
+    {
+        killer.killCurrentJVM(t, quiet);
+    }
+
     @VisibleForTesting
     public static Killer replaceKiller(Killer newKiller) {
         Killer oldKiller = JVMStabilityInspector.killer;
@@ -90,8 +95,16 @@ public final class JVMStabilityInspector
         */
         protected void killCurrentJVM(Throwable t)
         {
-            t.printStackTrace(System.err);
-            logger.error("JVM state determined to be unstable.  Exiting 
forcefully due to:", t);
+            killCurrentJVM(t, false);
+        }
+
+        protected void killCurrentJVM(Throwable t, boolean quiet)
+        {
+            if (!quiet)
+            {
+                t.printStackTrace(System.err);
+                logger.error("JVM state determined to be unstable.  Exiting 
forcefully due to:", t);
+            }
             StorageService.instance.removeShutdownHook();
             System.exit(100);
         }

Reply via email to