Fix out-of-space error treatment in memtable flushing

Patch by Branimir Lambov; reviewed by Joshua McKenzie for
CASSANDRA-11448


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f3b3c410
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f3b3c410
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f3b3c410

Branch: refs/heads/cassandra-2.2
Commit: f3b3c410a0d84a4348cf05954b38df6b087762a7
Parents: 105fbb3
Author: Branimir Lambov <branimir.lam...@datastax.com>
Authored: Wed Mar 30 16:29:55 2016 +0300
Committer: Josh McKenzie <josh.mcken...@datastax.com>
Committed: Fri Apr 1 11:45:18 2016 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  |  24 ++-
 .../org/apache/cassandra/db/Directories.java    |   2 +-
 .../db/commitlog/CommitLogSegmentManager.java   |   4 +-
 .../cassandra/io/util/DiskAwareRunnable.java    |   5 +-
 .../apache/cassandra/cql3/OutOfSpaceTest.java   | 157 +++++++++++++++++++
 6 files changed, 185 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3b3c410/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index af2518c..50bc894 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.14
+ * Fix out-of-space error treatment in memtable flushing (CASSANDRA-11448).
  * Backport CASSANDRA-10859 (CASSANDRA-11415)
  * COPY FROM fails when importing blob (CASSANDRA-11375)
  * Backport CASSANDRA-10679 (CASSANDRA-9598)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3b3c410/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 3d66d3a..edc3564 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -967,6 +967,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         final OpOrder.Barrier writeBarrier;
         final CountDownLatch latch = new CountDownLatch(1);
         final ReplayPosition lastReplayPosition;
+        volatile FSWriteError flushFailure = null;
 
         private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier 
writeBarrier, ReplayPosition lastReplayPosition)
         {
@@ -1009,12 +1010,16 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
 
             // must check lastReplayPosition != null because Flush may find 
that all memtables are clean
             // and so not set a lastReplayPosition
-            if (lastReplayPosition != null)
+            // If a flush errored out but the error was ignored, make sure we 
don't discard the commit log.
+            if (lastReplayPosition != null && flushFailure == null)
             {
                 CommitLog.instance.discardCompletedSegments(metadata.cfId, 
lastReplayPosition);
             }
 
             metric.pendingFlushes.dec();
+
+            if (flushFailure != null)
+                throw flushFailure;
         }
     }
 
@@ -1114,11 +1119,20 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
 
             metric.memtableSwitchCount.inc();
 
-            for (Memtable memtable : memtables)
+            try
+            {
+                for (Memtable memtable : memtables)
+                {
+                    // flush the memtable
+                    
MoreExecutors.sameThreadExecutor().execute(memtable.flushRunnable());
+                    reclaim(memtable);
+                }
+            }
+            catch (FSWriteError e)
             {
-                // flush the memtable
-                
MoreExecutors.sameThreadExecutor().execute(memtable.flushRunnable());
-                reclaim(memtable);
+                JVMStabilityInspector.inspectThrowable(e);
+                // If we weren't killed, try to continue work but do not allow 
CommitLog to be discarded.
+                postFlush.flushFailure = e;
             }
 
             // signal the post-flush we've done our work

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3b3c410/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Directories.java 
b/src/java/org/apache/cassandra/db/Directories.java
index 810c336..35aa447 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -310,7 +310,7 @@ public class Directories
             if (tooBig)
                 return null;
             else
-                throw new IOError(new IOException("All configured data 
directories have been blacklisted as unwritable for erroring out"));
+                throw new FSWriteError(new IOException("All configured data 
directories have been blacklisted as unwritable for erroring out"), "");
 
         // shortcut for single data directory systems
         if (candidates.size() == 1)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3b3c410/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java 
b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
index 0ea54ab..6838de6 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
@@ -35,6 +35,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.*;
 import org.slf4j.Logger;
@@ -554,7 +555,8 @@ public class CommitLogSegmentManager
     /**
      * @return a read-only collection of the active commit log segments
      */
-    Collection<CommitLogSegment> getActiveSegments()
+    @VisibleForTesting
+    public Collection<CommitLogSegment> getActiveSegments()
     {
         return Collections.unmodifiableCollection(activeSegments);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3b3c410/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java 
b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
index 925efd6..1a15d6f 100644
--- a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
+++ b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
@@ -17,7 +17,10 @@
  */
 package org.apache.cassandra.io.util;
 
+import java.io.IOException;
+
 import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.utils.WrappedRunnable;
 
 public abstract class DiskAwareRunnable extends WrappedRunnable
@@ -26,7 +29,7 @@ public abstract class DiskAwareRunnable extends 
WrappedRunnable
     {
         Directories.DataDirectory directory = 
getDirectories().getWriteableLocation(writeSize);
         if (directory == null)
-            throw new RuntimeException("Insufficient disk space to write " + 
writeSize + " bytes");
+            throw new FSWriteError(new IOException("Insufficient disk space to 
write " + writeSize + " bytes"), "");
 
         return directory;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3b3c410/test/unit/org/apache/cassandra/cql3/OutOfSpaceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/OutOfSpaceTest.java 
b/test/unit/org/apache/cassandra/cql3/OutOfSpaceTest.java
new file mode 100644
index 0000000..8304aff
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/OutOfSpaceTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3;
+
+import static junit.framework.Assert.fail;
+
+import java.io.IOError;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.config.Config.DiskFailurePolicy;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.BlacklistedDirectories;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories.DataDirectory;
+import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.commitlog.CommitLogSegment;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.KillerForTests;
+
+/**
+ * Test that TombstoneOverwhelmingException gets thrown when it should be and 
doesn't when it shouldn't be.
+ */
+public class OutOfSpaceTest extends CQLTester
+{
+    @Test
+    public void testFlushUnwriteableDie() throws Throwable
+    {
+        makeTable();
+        markDirectoriesUnwriteable();
+
+        KillerForTests killerForTests = new KillerForTests();
+        JVMStabilityInspector.Killer originalKiller = 
JVMStabilityInspector.replaceKiller(killerForTests);
+        DiskFailurePolicy oldPolicy = 
DatabaseDescriptor.getDiskFailurePolicy();
+        try
+        {
+            DatabaseDescriptor.setDiskFailurePolicy(DiskFailurePolicy.die);
+            flushAndExpectError();
+            Assert.assertTrue(killerForTests.wasKilled());
+            Assert.assertFalse(killerForTests.wasKilledQuietly()); //only 
killed quietly on startup failure
+        }
+        finally
+        {
+            DatabaseDescriptor.setDiskFailurePolicy(oldPolicy);
+            JVMStabilityInspector.replaceKiller(originalKiller);
+        }
+    }
+
+    @Test
+    public void testFlushUnwriteableStop() throws Throwable
+    {
+        makeTable();
+        markDirectoriesUnwriteable();
+
+        DiskFailurePolicy oldPolicy = 
DatabaseDescriptor.getDiskFailurePolicy();
+        try
+        {
+            DatabaseDescriptor.setDiskFailurePolicy(DiskFailurePolicy.stop);
+            flushAndExpectError();
+            Assert.assertFalse(Gossiper.instance.isEnabled());
+        }
+        finally
+        {
+            DatabaseDescriptor.setDiskFailurePolicy(oldPolicy);
+        }
+    }
+
+    @Test
+    public void testFlushUnwriteableIgnore() throws Throwable
+    {
+        makeTable();
+        markDirectoriesUnwriteable();
+
+        DiskFailurePolicy oldPolicy = 
DatabaseDescriptor.getDiskFailurePolicy();
+        try
+        {
+            DatabaseDescriptor.setDiskFailurePolicy(DiskFailurePolicy.ignore);
+            flushAndExpectError();
+        }
+        finally
+        {
+            DatabaseDescriptor.setDiskFailurePolicy(oldPolicy);
+        }
+
+        // Next flush should succeed.
+        makeTable();
+        flush();
+    }
+
+    public void makeTable() throws Throwable
+    {
+        createTable("CREATE TABLE %s (a text, b text, c text, PRIMARY KEY (a, 
b));");
+
+        // insert exactly the amount of tombstones that shouldn't trigger an 
exception
+        for (int i = 0; i < 10; i++)
+            execute("INSERT INTO %s (a, b, c) VALUES ('key', 'column" + i + 
"', null);");
+    }
+
+    public void markDirectoriesUnwriteable()
+    {
+        ColumnFamilyStore cfs = 
Keyspace.open(keyspace()).getColumnFamilyStore(currentTable());
+        try
+        {
+            for ( ; ; )
+            {
+                DataDirectory dir = cfs.directories.getWriteableLocation(1);
+                
BlacklistedDirectories.maybeMarkUnwritable(cfs.directories.getLocationForDisk(dir));
+            }
+        }
+        catch (IOError e)
+        {
+            // Expected -- marked all directories as unwritable
+        }
+    }
+
+    public void flushAndExpectError() throws InterruptedException, 
ExecutionException
+    {
+        try
+        {
+            
Keyspace.open(KEYSPACE).getColumnFamilyStore(currentTable()).forceFlush().get();
+            fail("FSWriteError expected.");
+        }
+        catch (ExecutionException e)
+        {
+            // Correct path.
+            Assert.assertTrue(e.getCause() instanceof FSWriteError);
+        }
+
+        // Make sure commit log wasn't discarded.
+        UUID cfid = currentTableMetadata().cfId;
+        for (CommitLogSegment segment : 
CommitLog.instance.allocator.getActiveSegments())
+            if (segment.getDirtyCFIDs().contains(cfid))
+                return;
+        fail("Expected commit log to remain dirty for the affected table.");
+    }
+}

Reply via email to