Merge commit failure policy into 2.1 patch by Benedict Elliott Smith; reviewed by Marcus Eriksson for CASSANDRA-7429
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/da3606e1 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/da3606e1 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/da3606e1 Branch: refs/heads/trunk Commit: da3606e180411e88774a9f24e7884147a202fea8 Parents: b8ee9da Author: Benedict Elliott Smith <[email protected]> Authored: Mon Jun 30 13:00:42 2014 +0100 Committer: Benedict Elliott Smith <[email protected]> Committed: Mon Jun 30 13:00:42 2014 +0100 ---------------------------------------------------------------------- conf/cassandra.yaml | 8 ++ .../org/apache/cassandra/config/Config.java | 8 ++ .../cassandra/config/DatabaseDescriptor.java | 10 +++ .../db/commitlog/AbstractCommitLogService.java | 4 +- .../cassandra/db/commitlog/CommitLog.java | 21 +++++ .../db/commitlog/CommitLogSegmentManager.java | 88 +++++++++++--------- .../org/apache/cassandra/io/util/FileUtils.java | 13 --- .../org/apache/cassandra/db/CommitLogTest.java | 31 +++++++ 8 files changed, 130 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index 64b5987..f1e5576 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -113,6 +113,14 @@ partitioner: org.apache.cassandra.dht.Murmur3Partitioner # ignore: ignore fatal errors and let requests fail, as in pre-1.2 Cassandra disk_failure_policy: stop +# policy for commit disk failures: +# stop: shut down gossip and Thrift, leaving the node effectively dead, but +# can still be inspected via JMX. +# stop_commit: shutdown the commit log, letting writes collect but +# continuing to service reads, as in pre-2.0.5 Cassandra +# ignore: ignore fatal errors and let the batches fail +commit_failure_policy: stop + # Maximum size of the key cache in memory. # # Each key cache hit saves 1 seek and each row cache hit saves 2 seeks at the http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index c5e2b76..4d4a95b 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -57,6 +57,7 @@ public class Config public DiskAccessMode disk_access_mode = DiskAccessMode.auto; public DiskFailurePolicy disk_failure_policy = DiskFailurePolicy.ignore; + public CommitFailurePolicy commit_failure_policy = CommitFailurePolicy.stop; /* initial token in the ring */ public String initial_token; @@ -300,6 +301,13 @@ public class Config stop_paranoid, } + public static enum CommitFailurePolicy + { + stop, + stop_commit, + ignore, + } + public static enum RequestSchedulerId { keyspace http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 15c8c09..e8ce9df 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -1192,6 +1192,16 @@ public class DatabaseDescriptor return conf.disk_failure_policy; } + public static void setCommitFailurePolicy(Config.CommitFailurePolicy policy) + { + conf.commit_failure_policy = policy; + } + + public static Config.CommitFailurePolicy getCommitFailurePolicy() + { + return conf.commit_failure_policy; + } + public static boolean isSnapshotBeforeCompaction() { return conf.snapshot_before_compaction; http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java index 94802bf..59bf691 100644 --- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java +++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java @@ -122,7 +122,9 @@ public abstract class AbstractCommitLogService } catch (Throwable t) { - logger.error("Commit log sync failed", t); + if (!CommitLog.handleCommitError("Failed to persist commits to disk", t)) + break; + // sleep for full poll-interval after an error, so we don't spam the log file try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/src/java/org/apache/cassandra/db/commitlog/CommitLog.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java index eaa1b3c..41c01c3 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java @@ -21,10 +21,12 @@ import java.io.*; import java.lang.management.ManagementFactory; import java.nio.ByteBuffer; import java.util.*; +import java.util.concurrent.TimeUnit; import javax.management.MBeanServer; import javax.management.ObjectName; +import com.google.common.util.concurrent.Uninterruptibles; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,6 +37,7 @@ import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.util.DataOutputByteBuffer; import org.apache.cassandra.metrics.CommitLogMetrics; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.PureJavaCrc32; import static org.apache.cassandra.db.commitlog.CommitLogSegment.*; @@ -343,4 +346,22 @@ public class CommitLog implements CommitLogMBean { return allocator.getActiveSegments().size(); } + + static boolean handleCommitError(String message, Throwable t) + { + switch (DatabaseDescriptor.getCommitFailurePolicy()) + { + case stop: + StorageService.instance.stopTransports(); + case stop_commit: + logger.error(String.format("%s. Commit disk failure policy is %s; terminating thread", message, DatabaseDescriptor.getCommitFailurePolicy()), t); + return false; + case ignore: + logger.error(message, t); + return true; + default: + throw new AssertionError(DatabaseDescriptor.getCommitFailurePolicy()); + } + } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java index 117b9d1..ed0a7ff 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java @@ -30,6 +30,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import com.google.common.collect.Iterables; @@ -101,56 +102,65 @@ public class CommitLogSegmentManager { while (run) { - Callable<CommitLogSegment> task = segmentManagementTasks.poll(); - if (task == null) + try { - // if we have no more work to do, check if we should create a new segment - if (availableSegments.isEmpty() && (activeSegments.isEmpty() || createReserveSegments)) + Callable<CommitLogSegment> task = segmentManagementTasks.poll(); + if (task == null) { - logger.debug("No segments in reserve; creating a fresh one"); - size.addAndGet(DatabaseDescriptor.getCommitLogSegmentSize()); - // TODO : some error handling in case we fail to create a new segment - availableSegments.add(CommitLogSegment.freshSegment()); - hasAvailableSegments.signalAll(); - } + // if we have no more work to do, check if we should create a new segment + if (availableSegments.isEmpty() && (activeSegments.isEmpty() || createReserveSegments)) + { + logger.debug("No segments in reserve; creating a fresh one"); + size.addAndGet(DatabaseDescriptor.getCommitLogSegmentSize()); + // TODO : some error handling in case we fail to create a new segment + availableSegments.add(CommitLogSegment.freshSegment()); + hasAvailableSegments.signalAll(); + } - // flush old Cfs if we're full - long unused = unusedCapacity(); - if (unused < 0) - { - List<CommitLogSegment> segmentsToRecycle = new ArrayList<>(); - long spaceToReclaim = 0; - for (CommitLogSegment segment : activeSegments) + // flush old Cfs if we're full + long unused = unusedCapacity(); + if (unused < 0) { - if (segment == allocatingFrom) - break; - segmentsToRecycle.add(segment); - spaceToReclaim += DatabaseDescriptor.getCommitLogSegmentSize(); - if (spaceToReclaim + unused >= 0) - break; + List<CommitLogSegment> segmentsToRecycle = new ArrayList<>(); + long spaceToReclaim = 0; + for (CommitLogSegment segment : activeSegments) + { + if (segment == allocatingFrom) + break; + segmentsToRecycle.add(segment); + spaceToReclaim += DatabaseDescriptor.getCommitLogSegmentSize(); + if (spaceToReclaim + unused >= 0) + break; + } + flushDataFrom(segmentsToRecycle, false); } - flushDataFrom(segmentsToRecycle, false); - } - try - { - // wait for new work to be provided - task = segmentManagementTasks.take(); + try + { + // wait for new work to be provided + task = segmentManagementTasks.take(); + } + catch (InterruptedException e) + { + // shutdown signal; exit cleanly + continue; + } } - catch (InterruptedException e) + + CommitLogSegment recycled = task.call(); + if (recycled != null) { - // shutdown signal; exit cleanly - continue; + // if the work resulted in a segment to recycle, publish it + availableSegments.add(recycled); + hasAvailableSegments.signalAll(); } } - - // TODO : some error handling in case we fail on executing call (e.g. recycling) - CommitLogSegment recycled = task.call(); - if (recycled != null) + catch (Throwable t) { - // if the work resulted in a segment to recycle, publish it - availableSegments.add(recycled); - hasAvailableSegments.signalAll(); + if (!CommitLog.handleCommitError("Failed managing commit log segments", t)) + return; + // sleep some arbitrary period to avoid spamming CL + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/src/java/org/apache/cassandra/io/util/FileUtils.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java index 875c9d5..41b7aa3 100644 --- a/src/java/org/apache/cassandra/io/util/FileUtils.java +++ b/src/java/org/apache/cassandra/io/util/FileUtils.java @@ -390,19 +390,6 @@ public class FileUtils } } - public static void skipBytesFully(DataInput in, long bytes) throws IOException - { - long n = 0; - while (n < bytes) - { - int m = (int) Math.min(Integer.MAX_VALUE, bytes - n); - int skipped = in.skipBytes(m); - if (skipped == 0) - throw new EOFException("EOF after " + n + " bytes out of " + bytes); - n += skipped; - } - } - public static void handleCorruptSSTable(CorruptSSTableException e) { if (DatabaseDescriptor.getDiskFailurePolicy() == Config.DiskFailurePolicy.stop_paranoid) http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/test/unit/org/apache/cassandra/db/CommitLogTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/CommitLogTest.java b/test/unit/org/apache/cassandra/db/CommitLogTest.java index 660e91e..7046536 100644 --- a/test/unit/org/apache/cassandra/db/CommitLogTest.java +++ b/test/unit/org/apache/cassandra/db/CommitLogTest.java @@ -22,9 +22,11 @@ package org.apache.cassandra.db; import java.io.*; import java.nio.ByteBuffer; import java.util.UUID; +import java.util.concurrent.TimeUnit; import java.util.zip.CRC32; import java.util.zip.Checksum; +import com.google.common.util.concurrent.Uninterruptibles; import org.junit.Assert; import org.junit.Test; @@ -37,6 +39,7 @@ import org.apache.cassandra.db.commitlog.CommitLogDescriptor; import org.apache.cassandra.db.commitlog.CommitLogSegment; import org.apache.cassandra.db.composites.CellName; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.service.StorageService; import static org.apache.cassandra.utils.ByteBufferUtil.bytes; @@ -264,4 +267,32 @@ public class CommitLogTest extends SchemaLoader String newCLName = "CommitLog-" + CommitLogDescriptor.current_version + "-1340512736956320000.log"; Assert.assertEquals(MessagingService.current_version, CommitLogDescriptor.fromFileName(newCLName).getMessagingVersion()); } + + @Test + public void testCommitFailurePolicy_stop() + { + File commitDir = new File(DatabaseDescriptor.getCommitLogLocation()); + + try + { + + DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.stop); + commitDir.setWritable(false); + Mutation rm = new Mutation("Keyspace1", bytes("k")); + rm.add("Standard1", Util.cellname("c1"), ByteBuffer.allocate(100), 0); + + // Adding it twice (won't change segment) + CommitLog.instance.add(rm); + Uninterruptibles.sleepUninterruptibly((int) DatabaseDescriptor.getCommitLogSyncBatchWindow(), TimeUnit.MILLISECONDS); + Assert.assertFalse(StorageService.instance.isRPCServerRunning()); + Assert.assertFalse(StorageService.instance.isNativeTransportRunning()); + Assert.assertFalse(StorageService.instance.isInitialized()); + + } + finally + { + commitDir.setWritable(true); + } + } + }
