Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 19c00f721 -> 1ecccede7
Handle corrupt files on startup Patch by stefania; reviewed by marcuse for CASSANDRA-9686 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1ecccede Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1ecccede Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1ecccede Branch: refs/heads/cassandra-2.1 Commit: 1ecccede7d9252dd4eacdc31da6b5120d0fded9c Parents: 19c00f7 Author: Stefania Alborghetti <stefania.alborghe...@datastax.com> Authored: Wed Jul 1 11:15:15 2015 +0800 Committer: Marcus Eriksson <marc...@apache.org> Committed: Mon Jul 13 19:03:37 2015 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + conf/cassandra.yaml | 9 +- .../cassandra/io/sstable/SSTableReader.java | 137 +++++++++++++------ .../org/apache/cassandra/io/util/FileUtils.java | 22 +++ .../cassandra/service/CassandraDaemon.java | 9 +- .../cassandra/service/StorageService.java | 7 + .../cassandra/utils/JVMStabilityInspector.java | 17 ++- 7 files changed, 150 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/1ecccede/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 7635227..5f4fdf2 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.9 + * Handle corrupt files on startup (CASSANDRA-9686) * Fix clientutil jar and tests (CASSANDRA-9760) * (cqlsh) Allow the SSL protocol version to be specified through the config file or environment variables (CASSANDRA-9544) http://git-wip-us.apache.org/repos/asf/cassandra/blob/1ecccede/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index e4b7cbd..3047586 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -112,11 +112,12 @@ partitioner: org.apache.cassandra.dht.Murmur3Partitioner # commitlog_directory: /var/lib/cassandra/commitlog # policy for data disk failures: -# die: shut down gossip and Thrift and kill the JVM for any fs errors or +# die: shut down gossip and client transports and kill the JVM for any fs errors or # single-sstable errors, so the node can be replaced. -# stop_paranoid: shut down gossip and Thrift even for single-sstable errors. -# stop: shut down gossip and Thrift, leaving the node effectively dead, but -# can still be inspected via JMX. +# stop_paranoid: shut down gossip and client transports even for single-sstable errors, +# kill the JVM for errors during startup. +# stop: shut down gossip and client transports, leaving the node effectively dead, but +# can still be inspected via JMX, kill the JVM for errors during startup. # best_effort: stop using the failed disk and respond to requests based on # remaining available sstables. This means you WILL see obsolete # data at CL.ONE! http://git-wip-us.apache.org/repos/asf/cassandra/blob/1ecccede/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java index 6879834..92c9b55 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java @@ -69,6 +69,7 @@ import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.LocalPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.FSError; import org.apache.cassandra.io.compress.CompressionMetadata; import org.apache.cassandra.io.sstable.metadata.CompactionMetadata; import org.apache.cassandra.io.sstable.metadata.MetadataComponent; @@ -477,19 +478,27 @@ public class SSTableReader extends SSTable implements SelfRefCounted<SSTableRead statsMetadata, OpenReason.NORMAL); - // load index and filter - long start = System.nanoTime(); - sstable.load(validationMetadata); - logger.debug("INDEX LOAD TIME for {}: {} ms.", descriptor, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); + try + { + // load index and filter + long start = System.nanoTime(); + sstable.load(validationMetadata); + logger.debug("INDEX LOAD TIME for {}: {} ms.", descriptor, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); - sstable.setup(!validate); - if (validate) - sstable.validate(); + sstable.setup(!validate); + if (validate) + sstable.validate(); - if (sstable.getKeyCache() != null) - logger.debug("key cache contains {}/{} keys", sstable.getKeyCache().size(), sstable.getKeyCache().getCapacity()); + if (sstable.getKeyCache() != null) + logger.debug("key cache contains {}/{} keys", sstable.getKeyCache().size(), sstable.getKeyCache().getCapacity()); - return sstable; + return sstable; + } + catch (Throwable t) + { + sstable.selfRef().release(); + throw t; + } } public static void logOpenException(Descriptor descriptor, IOException e) @@ -518,9 +527,21 @@ public class SSTableReader extends SSTable implements SelfRefCounted<SSTableRead { sstable = open(entry.getKey(), entry.getValue(), metadata, partitioner); } + catch (CorruptSSTableException ex) + { + FileUtils.handleCorruptSSTable(ex); + logger.error("Corrupt sstable {}; skipping table", entry, ex); + return; + } + catch (FSError ex) + { + FileUtils.handleFSError(ex); + logger.error("Cannot read sstable {}; file system error, skipping table", entry, ex); + return; + } catch (IOException ex) { - logger.error("Corrupt sstable {}; skipped", entry, ex); + logger.error("Cannot read sstable {}; other IO error, skipping table", entry, ex); return; } sstables.add(sstable); @@ -704,46 +725,71 @@ public class SSTableReader extends SSTable implements SelfRefCounted<SSTableRead */ private void load(boolean recreateBloomFilter, boolean saveSummaryIfCreated) throws IOException { - SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode()); - SegmentedFile.Builder dbuilder = compression + try + { + SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode()); + SegmentedFile.Builder dbuilder = compression ? SegmentedFile.getCompressedBuilder() : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode()); - boolean summaryLoaded = loadSummary(ibuilder, dbuilder); - boolean builtSummary = false; - if (recreateBloomFilter || !summaryLoaded) - { - buildSummary(recreateBloomFilter, ibuilder, dbuilder, summaryLoaded, Downsampling.BASE_SAMPLING_LEVEL); - builtSummary = true; - } - - if (components.contains(Component.PRIMARY_INDEX)) - ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX)); - - dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA)); + boolean summaryLoaded = loadSummary(ibuilder, dbuilder); + boolean builtSummary = false; + if (recreateBloomFilter || !summaryLoaded) + { + buildSummary(recreateBloomFilter, ibuilder, dbuilder, summaryLoaded, Downsampling.BASE_SAMPLING_LEVEL); + builtSummary = true; + } - // Check for an index summary that was downsampled even though the serialization format doesn't support - // that. If it was downsampled, rebuild it. See CASSANDRA-8993 for details. - if (!descriptor.version.hasSamplingLevel && !builtSummary && !validateSummarySamplingLevel() && ifile != null) - { - indexSummary.close(); - ifile.close(); - dfile.close(); + if (components.contains(Component.PRIMARY_INDEX)) + ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX)); - logger.info("Detected erroneously downsampled index summary; will rebuild summary at full sampling"); - FileUtils.deleteWithConfirm(new File(descriptor.filenameFor(Component.SUMMARY))); - ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode()); - dbuilder = compression - ? SegmentedFile.getCompressedBuilder() - : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode()); - buildSummary(false, ibuilder, dbuilder, false, Downsampling.BASE_SAMPLING_LEVEL); - ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX)); dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA)); - saveSummary(ibuilder, dbuilder); + + // Check for an index summary that was downsampled even though the serialization format doesn't support + // that. If it was downsampled, rebuild it. See CASSANDRA-8993 for details. + if (!descriptor.version.hasSamplingLevel && !builtSummary && !validateSummarySamplingLevel() && ifile != null) + { + indexSummary.close(); + ifile.close(); + dfile.close(); + + logger.info("Detected erroneously downsampled index summary; will rebuild summary at full sampling"); + FileUtils.deleteWithConfirm(new File(descriptor.filenameFor(Component.SUMMARY))); + ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode()); + dbuilder = compression + ? SegmentedFile.getCompressedBuilder() + : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode()); + buildSummary(false, ibuilder, dbuilder, false, Downsampling.BASE_SAMPLING_LEVEL); + ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX)); + dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA)); + saveSummary(ibuilder, dbuilder); + } + else if (saveSummaryIfCreated && builtSummary) + { + saveSummary(ibuilder, dbuilder); + } } - else if (saveSummaryIfCreated && builtSummary) - { - saveSummary(ibuilder, dbuilder); + catch (Throwable t) + { // Because the tidier has not been set-up yet in SSTableReader.open(), we must release the files in case of error + if (ifile != null) + { + ifile.close(); + ifile = null; + } + + if (dfile != null) + { + dfile.close(); + dfile = null; + } + + if (indexSummary != null) + { + indexSummary.close(); + indexSummary = null; + } + + throw t; } } @@ -2174,7 +2220,8 @@ public class SSTableReader extends SSTable implements SelfRefCounted<SSTableRead summary.close(); if (runOnClose != null) runOnClose.run(); - dfile.close(); + if (dfile != null) + dfile.close(); if (ifile != null) ifile.close(); typeRef.release(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/1ecccede/src/java/org/apache/cassandra/io/util/FileUtils.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java index 7d187ac..3be7c99 100644 --- a/src/java/org/apache/cassandra/io/util/FileUtils.java +++ b/src/java/org/apache/cassandra/io/util/FileUtils.java @@ -395,6 +395,9 @@ public class FileUtils public static void handleCorruptSSTable(CorruptSSTableException e) { + if (!StorageService.instance.isSetupCompleted()) + handleStartupFSError(e); + JVMStabilityInspector.inspectThrowable(e); switch (DatabaseDescriptor.getDiskFailurePolicy()) { @@ -406,6 +409,9 @@ public class FileUtils public static void handleFSError(FSError e) { + if (!StorageService.instance.isSetupCompleted()) + handleStartupFSError(e); + JVMStabilityInspector.inspectThrowable(e); switch (DatabaseDescriptor.getDiskFailurePolicy()) { @@ -431,6 +437,22 @@ public class FileUtils } } + private static void handleStartupFSError(Throwable t) + { + switch (DatabaseDescriptor.getDiskFailurePolicy()) + { + case stop_paranoid: + case stop: + case die: + logger.error("Exiting forcefully due to file system exception on startup, disk failure policy \"{}\"", + DatabaseDescriptor.getDiskFailurePolicy(), + t); + JVMStabilityInspector.killCurrentJVM(t, true); + break; + default: + break; + } + } /** * Get the size of a directory in bytes * @param directory The directory for which we need size. http://git-wip-us.apache.org/repos/asf/cassandra/blob/1ecccede/src/java/org/apache/cassandra/service/CassandraDaemon.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index f66e523..949ea4c 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -134,7 +134,7 @@ public class CassandraDaemon public Server thriftServer; public Server nativeServer; - + private boolean setupCompleted = false; /** * This is a hook for concrete daemons to initialize themselves suitably. * @@ -425,6 +425,13 @@ public class CassandraDaemon InetAddress nativeAddr = DatabaseDescriptor.getRpcAddress(); int nativePort = DatabaseDescriptor.getNativeTransportPort(); nativeServer = new org.apache.cassandra.transport.Server(nativeAddr, nativePort); + + setupCompleted = true; + } + + public boolean setupCompleted() + { + return setupCompleted; } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/1ecccede/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index e4ffc5b..4a32bad 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -486,6 +486,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return initialized; } + public boolean isSetupCompleted() + { + return daemon == null + ? false + : daemon.setupCompleted(); + } + public void stopDaemon() { if (daemon == null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/1ecccede/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java index 2883ab3..c0ab84f 100644 --- a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java +++ b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java @@ -72,6 +72,11 @@ public final class JVMStabilityInspector inspectThrowable(t); } + public static void killCurrentJVM(Throwable t, boolean quiet) + { + killer.killCurrentJVM(t, quiet); + } + @VisibleForTesting public static Killer replaceKiller(Killer newKiller) { Killer oldKiller = JVMStabilityInspector.killer; @@ -90,8 +95,16 @@ public final class JVMStabilityInspector */ protected void killCurrentJVM(Throwable t) { - t.printStackTrace(System.err); - logger.error("JVM state determined to be unstable. Exiting forcefully due to:", t); + killCurrentJVM(t, false); + } + + protected void killCurrentJVM(Throwable t, boolean quiet) + { + if (!quiet) + { + t.printStackTrace(System.err); + logger.error("JVM state determined to be unstable. Exiting forcefully due to:", t); + } StorageService.instance.removeShutdownHook(); System.exit(100); }