Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 860cde74a -> d4df271f7


Add 'die' policy for commit log and disk failure

patch by John Sumsion and Josh McKenzie, reviewed by blerer for CASSANDRA-7927


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d4df271f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d4df271f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d4df271f

Branch: refs/heads/cassandra-2.1
Commit: d4df271f754ac4d5afd785d4043ccaaeb907a6ad
Parents: 860cde7
Author: Joshua McKenzie <jmcken...@apache.org>
Authored: Fri Oct 24 12:27:20 2014 -0500
Committer: Joshua McKenzie <jmcken...@apache.org>
Committed: Fri Oct 24 12:27:20 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 conf/cassandra.yaml                             |  5 +-
 .../org/apache/cassandra/config/Config.java     |  2 +
 .../cassandra/db/commitlog/CommitLog.java       | 12 ++++-
 .../org/apache/cassandra/io/util/FileUtils.java | 10 +++-
 .../cassandra/utils/JVMStabilityInspector.java  | 50 ++++++++++++++++++-
 .../org/apache/cassandra/db/CommitLogTest.java  | 45 +++++++++++------
 .../utils/JVMStabilityInspectorTest.java        | 51 ++++++++++++++++++++
 .../apache/cassandra/utils/KillerForTests.java  | 43 +++++++++++++++++
 9 files changed, 197 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4df271f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c136c5e..4ed07a9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.2
+ * Add 'die' policy for commit log and disk failure (CASSANDRA-7927)
  * Fix installing as service on Windows (CASSANDRA-8115)
  * Fix CREATE TABLE for CQL2 (CASSANDRA-8144)
  * Avoid boxing in ColumnStats min/max trackers (CASSANDRA-8109)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4df271f/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index c95c68c..74a8cfb 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -104,6 +104,8 @@ partitioner: org.apache.cassandra.dht.Murmur3Partitioner
 # commitlog_directory: /var/lib/cassandra/commitlog
 
 # policy for data disk failures:
+# die: shut down gossip and Thrift and kill the JVM for any fs errors or
+#      single-sstable errors, so the node can be replaced.
 # stop_paranoid: shut down gossip and Thrift even for single-sstable errors.
 # stop: shut down gossip and Thrift, leaving the node effectively dead, but
 #       can still be inspected via JMX.
@@ -114,9 +116,10 @@ partitioner: org.apache.cassandra.dht.Murmur3Partitioner
 disk_failure_policy: stop
 
 # policy for commit disk failures:
+# die: shut down gossip and Thrift and kill the JVM, so the node can be 
replaced.
 # stop: shut down gossip and Thrift, leaving the node effectively dead, but
 #       can still be inspected via JMX.
-# stop_commit: shutdown the commit log, letting writes collect but 
+# stop_commit: shutdown the commit log, letting writes collect but
 #              continuing to service reads, as in pre-2.0.5 Cassandra
 # ignore: ignore fatal errors and let the batches fail
 commit_failure_policy: stop

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4df271f/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index e2df89f..5f16239 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -299,6 +299,7 @@ public class Config
         stop,
         ignore,
         stop_paranoid,
+        die
     }
 
     public static enum CommitFailurePolicy
@@ -306,6 +307,7 @@ public class Config
         stop,
         stop_commit,
         ignore,
+        die,
     }
 
     public static enum RequestSchedulerId

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4df271f/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java 
b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index d38c4ed..ee9ca14 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -25,9 +25,12 @@ import java.util.*;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
-import org.apache.commons.lang3.StringUtils;
+import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.StringUtils;
+
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
@@ -36,6 +39,7 @@ import org.apache.cassandra.io.util.DataOutputByteBuffer;
 import org.apache.cassandra.metrics.CommitLogMetrics;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.PureJavaCrc32;
 
 import static org.apache.cassandra.db.commitlog.CommitLogSegment.*;
@@ -349,10 +353,14 @@ public class CommitLog implements CommitLogMBean
         return allocator.getActiveSegments().size();
     }
 
-    static boolean handleCommitError(String message, Throwable t)
+    @VisibleForTesting
+    public static boolean handleCommitError(String message, Throwable t)
     {
+        JVMStabilityInspector.inspectCommitLogThrowable(t);
         switch (DatabaseDescriptor.getCommitFailurePolicy())
         {
+            // Needed here for unit tests to not fail on default assertion
+            case die:
             case stop:
                 StorageService.instance.stopTransports();
             case stop_commit:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4df271f/src/java/org/apache/cassandra/io/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java 
b/src/java/org/apache/cassandra/io/util/FileUtils.java
index e590918..295679e 100644
--- a/src/java/org/apache/cassandra/io/util/FileUtils.java
+++ b/src/java/org/apache/cassandra/io/util/FileUtils.java
@@ -395,12 +395,18 @@ public class FileUtils
 
     public static void handleCorruptSSTable(CorruptSSTableException e)
     {
-        if (DatabaseDescriptor.getDiskFailurePolicy() == 
Config.DiskFailurePolicy.stop_paranoid)
-            StorageService.instance.stopTransports();
+        JVMStabilityInspector.inspectThrowable(e);
+        switch (DatabaseDescriptor.getDiskFailurePolicy())
+        {
+            case stop_paranoid:
+                StorageService.instance.stopTransports();
+                break;
+        }
     }
     
     public static void handleFSError(FSError e)
     {
+        JVMStabilityInspector.inspectThrowable(e);
         switch (DatabaseDescriptor.getDiskFailurePolicy())
         {
             case stop_paranoid:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4df271f/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java 
b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
index 9fdc5ea..bcff172 100644
--- a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
+++ b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
@@ -17,15 +17,28 @@
  */
 package org.apache.cassandra.utils;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.FSError;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.service.StorageService;
 
-public class JVMStabilityInspector
+/**
+ * Responsible for deciding whether to kill the JVM if it gets in an 
"unstable" state (think OOM).
+ */
+public final class JVMStabilityInspector
 {
     private static final Logger logger = 
LoggerFactory.getLogger(JVMStabilityInspector.class);
+    private static Killer killer = new Killer();
+
+    private JVMStabilityInspector() {}
+
     /**
-     * Certain Throwables and Exceptions represent "Stop" conditions for the 
server.
+     * Certain Throwables and Exceptions represent "Die" conditions for the 
server.
      * @param t
      *      The Throwable to check for server-stop conditions
      */
@@ -34,7 +47,40 @@ public class JVMStabilityInspector
         boolean isUnstable = false;
         if (t instanceof OutOfMemoryError)
             isUnstable = true;
+
+        if (DatabaseDescriptor.getDiskFailurePolicy() == 
Config.DiskFailurePolicy.die)
+            if (t instanceof FSError || t instanceof CorruptSSTableException)
+                isUnstable = true;
+
         if (isUnstable)
+            killer.killCurrentJVM(t);
+    }
+
+    public static void inspectCommitLogThrowable(Throwable t)
+    {
+        if (DatabaseDescriptor.getCommitFailurePolicy() == 
Config.CommitFailurePolicy.die)
+            killer.killCurrentJVM(t);
+        else
+            inspectThrowable(t);
+    }
+
+    @VisibleForTesting
+    public static Killer replaceKiller(Killer newKiller) {
+        Killer oldKiller = JVMStabilityInspector.killer;
+        JVMStabilityInspector.killer = newKiller;
+        return oldKiller;
+    }
+
+    @VisibleForTesting
+    public static class Killer
+    {
+        /**
+        * Certain situations represent "Die" conditions for the server, and if 
so, the reason is logged and the current JVM is killed.
+        *
+        * @param t
+        *      The Throwable to log before killing the current JVM
+        */
+        protected void killCurrentJVM(Throwable t)
         {
             t.printStackTrace(System.err);
             logger.error("JVM state determined to be unstable.  Exiting 
forcefully due to:", t);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4df271f/test/unit/org/apache/cassandra/db/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CommitLogTest.java 
b/test/unit/org/apache/cassandra/db/CommitLogTest.java
index ed9601d..8a1bb0c 100644
--- a/test/unit/org/apache/cassandra/db/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/CommitLogTest.java
@@ -23,11 +23,9 @@ import java.io.*;
 import java.nio.ByteBuffer;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
 import java.util.zip.CRC32;
 import java.util.zip.Checksum;
 
-import com.google.common.util.concurrent.Uninterruptibles;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -42,10 +40,14 @@ import org.apache.cassandra.db.commitlog.CommitLogSegment;
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.composites.CellNameType;
 import org.apache.cassandra.db.filter.NamesQueryFilter;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.KillerForTests;
 
 import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
 
@@ -275,29 +277,42 @@ public class CommitLogTest extends SchemaLoader
     }
 
     @Test
-    public void testCommitFailurePolicy_stop()
+    public void testCommitFailurePolicy_stop() throws ConfigurationException
     {
-        File commitDir = new File(DatabaseDescriptor.getCommitLogLocation());
+        // Need storage service active so stop policy can shutdown gossip
+        StorageService.instance.initServer();
+        Assert.assertTrue(Gossiper.instance.isEnabled());
 
+        Config.CommitFailurePolicy oldPolicy = 
DatabaseDescriptor.getCommitFailurePolicy();
         try
         {
-
             
DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.stop);
-            commitDir.setWritable(false);
-            Mutation rm = new Mutation("Keyspace1", bytes("k"));
-            rm.add("Standard1", Util.cellname("c1"), ByteBuffer.allocate(100), 
0);
+            CommitLog.handleCommitError("Test stop error", new Throwable());
+            Assert.assertFalse(Gossiper.instance.isEnabled());
+        }
+        finally
+        {
+            DatabaseDescriptor.setCommitFailurePolicy(oldPolicy);
+        }
+    }
 
-            // Adding it twice (won't change segment)
-            CommitLog.instance.add(rm);
-            Uninterruptibles.sleepUninterruptibly((int) 
DatabaseDescriptor.getCommitLogSyncBatchWindow(), TimeUnit.MILLISECONDS);
-            Assert.assertFalse(StorageService.instance.isRPCServerRunning());
-            
Assert.assertFalse(StorageService.instance.isNativeTransportRunning());
-            Assert.assertFalse(StorageService.instance.isInitialized());
+    @Test
+    public void testCommitFailurePolicy_die()
+    {
+        KillerForTests killerForTests = new KillerForTests();
+        JVMStabilityInspector.Killer originalKiller = 
JVMStabilityInspector.replaceKiller(killerForTests);
+        Config.CommitFailurePolicy oldPolicy = 
DatabaseDescriptor.getCommitFailurePolicy();
 
+        try
+        {
+            
DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.die);
+            CommitLog.handleCommitError("Testing die policy", new Throwable());
+            Assert.assertTrue(killerForTests.wasKilled());
         }
         finally
         {
-            commitDir.setWritable(true);
+            DatabaseDescriptor.setCommitFailurePolicy(oldPolicy);
+            JVMStabilityInspector.replaceKiller(originalKiller);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4df271f/test/unit/org/apache/cassandra/utils/JVMStabilityInspectorTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/utils/JVMStabilityInspectorTest.java 
b/test/unit/org/apache/cassandra/utils/JVMStabilityInspectorTest.java
new file mode 100644
index 0000000..e2a5107
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/JVMStabilityInspectorTest.java
@@ -0,0 +1,51 @@
+package org.apache.cassandra.utils;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.FSReadError;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class JVMStabilityInspectorTest
+{
+    @Test
+    public void testKill() throws Exception
+    {
+        KillerForTests killerForTests = new KillerForTests();
+        JVMStabilityInspector.Killer originalKiller = 
JVMStabilityInspector.replaceKiller(killerForTests);
+
+        Config.DiskFailurePolicy oldPolicy = 
DatabaseDescriptor.getDiskFailurePolicy();
+        Config.CommitFailurePolicy oldCommitPolicy = 
DatabaseDescriptor.getCommitFailurePolicy();
+        try
+        {
+            killerForTests.reset();
+            JVMStabilityInspector.inspectThrowable(new IOException());
+            assertFalse(killerForTests.wasKilled());
+
+            killerForTests.reset();
+            JVMStabilityInspector.inspectThrowable(new OutOfMemoryError());
+            assertTrue(killerForTests.wasKilled());
+
+            
DatabaseDescriptor.setDiskFailurePolicy(Config.DiskFailurePolicy.die);
+            killerForTests.reset();
+            JVMStabilityInspector.inspectThrowable(new FSReadError(new 
IOException(), "blah"));
+            assertTrue(killerForTests.wasKilled());
+
+            
DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.die);
+            killerForTests.reset();
+            JVMStabilityInspector.inspectCommitLogThrowable(new Throwable());
+            assertTrue(killerForTests.wasKilled());
+        }
+        finally
+        {
+            JVMStabilityInspector.replaceKiller(originalKiller);
+            DatabaseDescriptor.setDiskFailurePolicy(oldPolicy);
+            DatabaseDescriptor.setCommitFailurePolicy(oldCommitPolicy);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4df271f/test/unit/org/apache/cassandra/utils/KillerForTests.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/KillerForTests.java 
b/test/unit/org/apache/cassandra/utils/KillerForTests.java
new file mode 100644
index 0000000..83cd7fc
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/KillerForTests.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+/**
+ * Responsible for stubbing out the System.exit() logic during unit tests.
+ */
+public class KillerForTests extends JVMStabilityInspector.Killer
+{
+    private boolean killed = false;
+
+    @Override
+    protected void killCurrentJVM(Throwable t)
+    {
+        killed = true;
+    }
+
+    public boolean wasKilled()
+    {
+        return killed;
+    }
+
+    public void reset()
+    {
+        killed = false;
+    }
+}

Reply via email to