Fix out-of-space error treatment in memtable flushing Patch by Branimir Lambov; reviewed by Joshua McKenzie for CASSANDRA-11448
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f3b3c410 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f3b3c410 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f3b3c410 Branch: refs/heads/trunk Commit: f3b3c410a0d84a4348cf05954b38df6b087762a7 Parents: 105fbb3 Author: Branimir Lambov <branimir.lam...@datastax.com> Authored: Wed Mar 30 16:29:55 2016 +0300 Committer: Josh McKenzie <josh.mcken...@datastax.com> Committed: Fri Apr 1 11:45:18 2016 -0400 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ColumnFamilyStore.java | 24 ++- .../org/apache/cassandra/db/Directories.java | 2 +- .../db/commitlog/CommitLogSegmentManager.java | 4 +- .../cassandra/io/util/DiskAwareRunnable.java | 5 +- .../apache/cassandra/cql3/OutOfSpaceTest.java | 157 +++++++++++++++++++ 6 files changed, 185 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3b3c410/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index af2518c..50bc894 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.14 + * Fix out-of-space error treatment in memtable flushing (CASSANDRA-11448). * Backport CASSANDRA-10859 (CASSANDRA-11415) * COPY FROM fails when importing blob (CASSANDRA-11375) * Backport CASSANDRA-10679 (CASSANDRA-9598) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3b3c410/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 3d66d3a..edc3564 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -967,6 +967,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean final OpOrder.Barrier writeBarrier; final CountDownLatch latch = new CountDownLatch(1); final ReplayPosition lastReplayPosition; + volatile FSWriteError flushFailure = null; private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier, ReplayPosition lastReplayPosition) { @@ -1009,12 +1010,16 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean // must check lastReplayPosition != null because Flush may find that all memtables are clean // and so not set a lastReplayPosition - if (lastReplayPosition != null) + // If a flush errored out but the error was ignored, make sure we don't discard the commit log. + if (lastReplayPosition != null && flushFailure == null) { CommitLog.instance.discardCompletedSegments(metadata.cfId, lastReplayPosition); } metric.pendingFlushes.dec(); + + if (flushFailure != null) + throw flushFailure; } } @@ -1114,11 +1119,20 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean metric.memtableSwitchCount.inc(); - for (Memtable memtable : memtables) + try + { + for (Memtable memtable : memtables) + { + // flush the memtable + MoreExecutors.sameThreadExecutor().execute(memtable.flushRunnable()); + reclaim(memtable); + } + } + catch (FSWriteError e) { - // flush the memtable - MoreExecutors.sameThreadExecutor().execute(memtable.flushRunnable()); - reclaim(memtable); + JVMStabilityInspector.inspectThrowable(e); + // If we weren't killed, try to continue work but do not allow CommitLog to be discarded. + postFlush.flushFailure = e; } // signal the post-flush we've done our work http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3b3c410/src/java/org/apache/cassandra/db/Directories.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java index 810c336..35aa447 100644 --- a/src/java/org/apache/cassandra/db/Directories.java +++ b/src/java/org/apache/cassandra/db/Directories.java @@ -310,7 +310,7 @@ public class Directories if (tooBig) return null; else - throw new IOError(new IOException("All configured data directories have been blacklisted as unwritable for erroring out")); + throw new FSWriteError(new IOException("All configured data directories have been blacklisted as unwritable for erroring out"), ""); // shortcut for single data directory systems if (candidates.size() == 1) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3b3c410/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java index 0ea54ab..6838de6 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java @@ -35,6 +35,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.*; import org.slf4j.Logger; @@ -554,7 +555,8 @@ public class CommitLogSegmentManager /** * @return a read-only collection of the active commit log segments */ - Collection<CommitLogSegment> getActiveSegments() + @VisibleForTesting + public Collection<CommitLogSegment> getActiveSegments() { return Collections.unmodifiableCollection(activeSegments); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3b3c410/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java index 925efd6..1a15d6f 100644 --- a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java +++ b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java @@ -17,7 +17,10 @@ */ package org.apache.cassandra.io.util; +import java.io.IOException; + import org.apache.cassandra.db.Directories; +import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.utils.WrappedRunnable; public abstract class DiskAwareRunnable extends WrappedRunnable @@ -26,7 +29,7 @@ public abstract class DiskAwareRunnable extends WrappedRunnable { Directories.DataDirectory directory = getDirectories().getWriteableLocation(writeSize); if (directory == null) - throw new RuntimeException("Insufficient disk space to write " + writeSize + " bytes"); + throw new FSWriteError(new IOException("Insufficient disk space to write " + writeSize + " bytes"), ""); return directory; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3b3c410/test/unit/org/apache/cassandra/cql3/OutOfSpaceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/OutOfSpaceTest.java b/test/unit/org/apache/cassandra/cql3/OutOfSpaceTest.java new file mode 100644 index 0000000..8304aff --- /dev/null +++ b/test/unit/org/apache/cassandra/cql3/OutOfSpaceTest.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cql3; + +import static junit.framework.Assert.fail; + +import java.io.IOError; +import java.util.UUID; +import java.util.concurrent.ExecutionException; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.cassandra.config.Config.DiskFailurePolicy; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.BlacklistedDirectories; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Directories.DataDirectory; +import org.apache.cassandra.db.commitlog.CommitLog; +import org.apache.cassandra.db.commitlog.CommitLogSegment; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.utils.JVMStabilityInspector; +import org.apache.cassandra.utils.KillerForTests; + +/** + * Test that TombstoneOverwhelmingException gets thrown when it should be and doesn't when it shouldn't be. + */ +public class OutOfSpaceTest extends CQLTester +{ + @Test + public void testFlushUnwriteableDie() throws Throwable + { + makeTable(); + markDirectoriesUnwriteable(); + + KillerForTests killerForTests = new KillerForTests(); + JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests); + DiskFailurePolicy oldPolicy = DatabaseDescriptor.getDiskFailurePolicy(); + try + { + DatabaseDescriptor.setDiskFailurePolicy(DiskFailurePolicy.die); + flushAndExpectError(); + Assert.assertTrue(killerForTests.wasKilled()); + Assert.assertFalse(killerForTests.wasKilledQuietly()); //only killed quietly on startup failure + } + finally + { + DatabaseDescriptor.setDiskFailurePolicy(oldPolicy); + JVMStabilityInspector.replaceKiller(originalKiller); + } + } + + @Test + public void testFlushUnwriteableStop() throws Throwable + { + makeTable(); + markDirectoriesUnwriteable(); + + DiskFailurePolicy oldPolicy = DatabaseDescriptor.getDiskFailurePolicy(); + try + { + DatabaseDescriptor.setDiskFailurePolicy(DiskFailurePolicy.stop); + flushAndExpectError(); + Assert.assertFalse(Gossiper.instance.isEnabled()); + } + finally + { + DatabaseDescriptor.setDiskFailurePolicy(oldPolicy); + } + } + + @Test + public void testFlushUnwriteableIgnore() throws Throwable + { + makeTable(); + markDirectoriesUnwriteable(); + + DiskFailurePolicy oldPolicy = DatabaseDescriptor.getDiskFailurePolicy(); + try + { + DatabaseDescriptor.setDiskFailurePolicy(DiskFailurePolicy.ignore); + flushAndExpectError(); + } + finally + { + DatabaseDescriptor.setDiskFailurePolicy(oldPolicy); + } + + // Next flush should succeed. + makeTable(); + flush(); + } + + public void makeTable() throws Throwable + { + createTable("CREATE TABLE %s (a text, b text, c text, PRIMARY KEY (a, b));"); + + // insert exactly the amount of tombstones that shouldn't trigger an exception + for (int i = 0; i < 10; i++) + execute("INSERT INTO %s (a, b, c) VALUES ('key', 'column" + i + "', null);"); + } + + public void markDirectoriesUnwriteable() + { + ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(currentTable()); + try + { + for ( ; ; ) + { + DataDirectory dir = cfs.directories.getWriteableLocation(1); + BlacklistedDirectories.maybeMarkUnwritable(cfs.directories.getLocationForDisk(dir)); + } + } + catch (IOError e) + { + // Expected -- marked all directories as unwritable + } + } + + public void flushAndExpectError() throws InterruptedException, ExecutionException + { + try + { + Keyspace.open(KEYSPACE).getColumnFamilyStore(currentTable()).forceFlush().get(); + fail("FSWriteError expected."); + } + catch (ExecutionException e) + { + // Correct path. + Assert.assertTrue(e.getCause() instanceof FSWriteError); + } + + // Make sure commit log wasn't discarded. + UUID cfid = currentTableMetadata().cfId; + for (CommitLogSegment segment : CommitLog.instance.allocator.getActiveSegments()) + if (segment.getDirtyCFIDs().contains(cfid)) + return; + fail("Expected commit log to remain dirty for the affected table."); + } +}