Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 db9623904 -> a5bc52eee refs/heads/trunk b1c45b7f3 -> 4d282ca5d
Revert "Fail to start if commit log replay encounters an exception" This reverts commit 581ce631026b98ee9438d54ef144df89bc91100b. Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a5bc52ee Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a5bc52ee Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a5bc52ee Branch: refs/heads/cassandra-2.1 Commit: a5bc52eee90e342efcdc53282612008d3dbaeaeb Parents: db96239 Author: Jonathan Ellis <[email protected]> Authored: Tue Jul 29 11:57:34 2014 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Tue Jul 29 11:57:34 2014 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 - .../cassandra/db/commitlog/CommitLog.java | 17 +-- .../db/commitlog/CommitLogDescriptor.java | 8 +- .../db/commitlog/CommitLogReplayer.java | 76 +++-------- .../commitlog/MalformedCommitLogException.java | 16 --- .../cassandra/service/CassandraDaemon.java | 2 - .../org/apache/cassandra/db/CommitLogTest.java | 133 ++++--------------- 7 files changed, 48 insertions(+), 205 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5bc52ee/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 64f9793..1a2dc57 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,7 +1,6 @@ 2.1.1 * Pig support for hadoop CqlInputFormat (CASSANDRA-6454) * Add listen_interface and rpc_interface options (CASSANDRA-7417) - * Fail to start if commit log replay detects a problem (CASSANDRA-7125) * Improve schema merge performance (CASSANDRA-7444) * Adjust MT depth based on # of partition validating (CASSANDRA-5263) * Optimise NativeCell comparisons (CASSANDRA-6755) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5bc52ee/src/java/org/apache/cassandra/db/commitlog/CommitLog.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java index a1be25d..d2a5fa7 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java @@ -131,20 +131,9 @@ public class CommitLog implements CommitLogMBean */ public int recover(File... clogs) throws IOException { - try - { - CommitLogReplayer recovery = new CommitLogReplayer(); - recovery.recover(clogs); - return recovery.blockForWrites(); - } - catch (IOException e) - { - if (e instanceof UnknownColumnFamilyException) - logger.error("Commit log replay failed due to replaying a mutation for a missing table. This error can be ignored by providing -Dcassandra.commitlog.stop_on_missing_tables=false on the command line"); - if (e instanceof MalformedCommitLogException) - logger.error("Commit log replay failed due to a non-fatal exception. This error can be ignored by providing -Dcassandra.commitlog.stop_on_errors=false on the command line"); - throw e; - } + CommitLogReplayer recovery = new CommitLogReplayer(); + recovery.recover(clogs); + return recovery.blockForWrites(); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5bc52ee/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java index 77c25d3..91c81e1 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java @@ -28,8 +28,6 @@ import java.nio.ByteBuffer; import java.util.regex.Matcher; import java.util.regex.Pattern; -import com.google.common.annotations.VisibleForTesting; - import org.apache.cassandra.io.FSReadError; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.FBUtilities; @@ -50,11 +48,10 @@ public class CommitLogDescriptor * Increment this number if there is a changes in the commit log disc layout or MessagingVersion changes. * Note: make sure to handle {@link #getMessagingVersion()} */ - @VisibleForTesting public static final int current_version = VERSION_21; // [version, id, checksum] - public static final int HEADER_SIZE = 4 + 8 + 4; + static final int HEADER_SIZE = 4 + 8 + 4; final int version; public final long id; @@ -70,8 +67,7 @@ public class CommitLogDescriptor this(current_version, id); } - @VisibleForTesting - public static void writeHeader(ByteBuffer out, CommitLogDescriptor descriptor) + static void writeHeader(ByteBuffer out, CommitLogDescriptor descriptor) { out.putInt(0, descriptor.version); out.putLong(4, descriptor.id); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5bc52ee/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index 10d13b2..1012829 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@ -23,7 +23,6 @@ import java.util.*; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicate; import com.google.common.collect.HashMultimap; import com.google.common.collect.Iterables; @@ -49,8 +48,6 @@ public class CommitLogReplayer private static final Logger logger = LoggerFactory.getLogger(CommitLogReplayer.class); private static final int MAX_OUTSTANDING_REPLAY_COUNT = 1024; private static final int LEGACY_END_OF_SEGMENT_MARKER = 0; - private static boolean IGNORE_ERRORS = System.getProperty("cassandra.commitlog.stop_on_errors", "true").equals("false"); - private static boolean IGNORE_MISSING_TABLES = IGNORE_ERRORS || System.getProperty("cassandra.commitlog.stop_on_missing_tables", "true").equals("false"); private final Set<Keyspace> keyspacesRecovered; private final List<Future<?>> futures; @@ -63,16 +60,16 @@ public class CommitLogReplayer public CommitLogReplayer() { - this.keyspacesRecovered = new NonBlockingHashSet<>(); - this.futures = new ArrayList<>(); + this.keyspacesRecovered = new NonBlockingHashSet<Keyspace>(); + this.futures = new ArrayList<Future<?>>(); this.buffer = new byte[4096]; - this.invalidMutations = new HashMap<>(); + this.invalidMutations = new HashMap<UUID, AtomicInteger>(); // count the number of replayed mutation. We don't really care about atomicity, but we need it to be a reference. this.replayedCount = new AtomicInteger(); this.checksum = new PureJavaCrc32(); // compute per-CF and global replay positions - cfPositions = new HashMap<>(); + cfPositions = new HashMap<UUID, ReplayPosition>(); Ordering<ReplayPosition> replayPositionOrdering = Ordering.from(ReplayPosition.comparator); for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) { @@ -120,12 +117,7 @@ public class CommitLogReplayer if (offset > reader.length() - CommitLogSegment.SYNC_MARKER_SIZE) { if (offset != reader.length() && offset != Integer.MAX_VALUE) - { - String message = String.format("Encountered bad header at position %d of Commit log %s; not enough room for a header", offset, reader.getPath()); - if (!IGNORE_ERRORS) - throw new MalformedCommitLogException(message); - logger.warn(message); - } + logger.warn("Encountered bad header at position {} of Commit log {}; not enough room for a header", offset, reader.getPath()); // cannot possibly be a header here. if we're == length(), assume it's a correctly written final segment return -1; } @@ -144,19 +136,13 @@ public class CommitLogReplayer { if (end != 0 || filecrc != 0) { - String message = String.format("Encountered bad header at position %d of Commit log %s, with invalid CRC. The end of segment marker should be zero.", offset, reader.getPath()); - if (!IGNORE_ERRORS) - throw new MalformedCommitLogException(message); - logger.warn(message); + logger.warn("Encountered bad header at position {} of commit log {}, with invalid CRC. The end of segment marker should be zero.", offset, reader.getPath()); } return -1; } else if (end < offset || end > reader.length()) { - String message = String.format("Encountered bad header at position %d of Commit log %s, with bad position but valid CRC.", offset, reader.getPath()); - if (!IGNORE_ERRORS) - throw new MalformedCommitLogException(message); - logger.warn(message); + logger.warn("Encountered bad header at position {} of commit log {}, with bad position but valid CRC", offset, reader.getPath()); return -1; } return end; @@ -285,9 +271,8 @@ public class CommitLogReplayer /* read the logs populate Mutation and apply */ while (reader.getPosition() < end && !reader.isEOF()) { - long mutationStart = reader.getFilePointer(); if (logger.isDebugEnabled()) - logger.debug("Reading mutation at {}", mutationStart); + logger.debug("Reading mutation at {}", reader.getFilePointer()); long claimedCRC32; int serializedSize; @@ -297,7 +282,7 @@ public class CommitLogReplayer serializedSize = reader.readInt(); if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER) { - logger.debug("Encountered end of segment marker at {}", mutationStart); + logger.debug("Encountered end of segment marker at {}", reader.getFilePointer()); break main; } @@ -306,11 +291,7 @@ public class CommitLogReplayer // 2-byte length from writeUTF/writeWithShortLength) and 4 bytes for column count. // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128 if (serializedSize < 10) - { - if (!IGNORE_ERRORS) - throw new MalformedCommitLogException("Too small mutation encountered at position " + mutationStart); break main; - } long claimedSizeChecksum; if (desc.version < CommitLogDescriptor.VERSION_21) @@ -324,11 +305,7 @@ public class CommitLogReplayer checksum.updateInt(serializedSize); if (checksum.getValue() != claimedSizeChecksum) - { - if (!IGNORE_ERRORS) - throw new IOException("Invalid size checksum for mutation at position " + mutationStart + " of " + file); break main; // entry wasn't synced correctly/fully. that's - } // ok. if (serializedSize > buffer.length) @@ -341,17 +318,12 @@ public class CommitLogReplayer } catch (EOFException eof) { - if (!IGNORE_ERRORS) - throw new MalformedCommitLogException("Encountered end-of-file unexpectedly", eof); - break main; // last CL entry didn't get completely written. that's ok. } checksum.update(buffer, 0, serializedSize); if (claimedCRC32 != checksum.getValue()) { - if (!IGNORE_ERRORS) - throw new IOException("Invalid checksum for mutation at position " + mutationStart + " of " + file); // this entry must not have been fsynced. probably the rest is bad too, // but just in case there is no harm in trying them (since we still read on an entry boundary) continue; @@ -372,9 +344,6 @@ public class CommitLogReplayer } catch (UnknownColumnFamilyException ex) { - if (!IGNORE_MISSING_TABLES) - throw ex; - if (ex.cfId == null) continue; AtomicInteger i = invalidMutations.get(ex.cfId); @@ -389,14 +358,16 @@ public class CommitLogReplayer } catch (Throwable t) { - if (!IGNORE_ERRORS) - throw new MalformedCommitLogException("Encountered bad mutation", t); - File f = File.createTempFile("mutation", "dat"); - try (DataOutputStream out = new DataOutputStream(new FileOutputStream(f))) + DataOutputStream out = new DataOutputStream(new FileOutputStream(f)); + try { out.write(buffer, 0, serializedSize); } + finally + { + out.close(); + } String st = String.format("Unexpected error deserializing mutation; saved to %s and ignored. This may be caused by replaying a mutation against a table with the same name but incompatible schema. Exception follows: ", f.getAbsolutePath()); logger.error(st, t); @@ -412,11 +383,7 @@ public class CommitLogReplayer public void runMayThrow() throws IOException { if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null) - { - if (!IGNORE_MISSING_TABLES) - throw new UnknownColumnFamilyException("Keyspace for this table is missing", mutation.getColumnFamilyIds().iterator().next()); return; - } if (pointInTimeExceeded(mutation)) return; @@ -431,12 +398,7 @@ public class CommitLogReplayer for (ColumnFamily columnFamily : replayFilter.filter(mutation)) { if (Schema.instance.getCF(columnFamily.id()) == null) - { - if (!IGNORE_MISSING_TABLES) - throw new UnknownColumnFamilyException("Missing table with cfid=" + columnFamily.id(), - mutation.getColumnFamilyIds().iterator().next()); continue; // dropped - } ReplayPosition rp = cfPositions.get(columnFamily.id()); @@ -453,7 +415,7 @@ public class CommitLogReplayer if (newMutation != null) { assert !newMutation.isEmpty(); - keyspace.apply(newMutation, false); + Keyspace.open(newMutation.getKeyspaceName()).apply(newMutation, false); keyspacesRecovered.add(keyspace); } } @@ -491,10 +453,4 @@ public class CommitLogReplayer } return false; } - - @VisibleForTesting - public static void setIgnoreErrors(boolean ignore) - { - IGNORE_ERRORS = IGNORE_MISSING_TABLES = ignore; - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5bc52ee/src/java/org/apache/cassandra/db/commitlog/MalformedCommitLogException.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/MalformedCommitLogException.java b/src/java/org/apache/cassandra/db/commitlog/MalformedCommitLogException.java deleted file mode 100644 index 84a5cb0..0000000 --- a/src/java/org/apache/cassandra/db/commitlog/MalformedCommitLogException.java +++ /dev/null @@ -1,16 +0,0 @@ -package org.apache.cassandra.db.commitlog; - -import java.io.IOException; - -// represents a non-fatal commit log replay exception (i.e. can be skipped with -Dcassandra.commitlog.ignoreerrors=true) -public class MalformedCommitLogException extends IOException -{ - public MalformedCommitLogException(String message) - { - super(message); - } - public MalformedCommitLogException(String message, Throwable cause) - { - super(message, cause); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5bc52ee/src/java/org/apache/cassandra/service/CassandraDaemon.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index 07c6cc4..fbee7ce 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -47,8 +47,6 @@ import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.SystemKeyspace; -import org.apache.cassandra.db.UnknownColumnFamilyException; -import org.apache.cassandra.db.commitlog.MalformedCommitLogException; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.exceptions.ConfigurationException; http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5bc52ee/test/unit/org/apache/cassandra/db/CommitLogTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/CommitLogTest.java b/test/unit/org/apache/cassandra/db/CommitLogTest.java index dd05272..7046536 100644 --- a/test/unit/org/apache/cassandra/db/CommitLogTest.java +++ b/test/unit/org/apache/cassandra/db/CommitLogTest.java @@ -36,53 +36,46 @@ import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.commitlog.CommitLogDescriptor; -import org.apache.cassandra.db.commitlog.CommitLogReplayer; import org.apache.cassandra.db.commitlog.CommitLogSegment; -import org.apache.cassandra.db.commitlog.MalformedCommitLogException; import org.apache.cassandra.db.composites.CellName; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.utils.PureJavaCrc32; import static org.apache.cassandra.utils.ByteBufferUtil.bytes; public class CommitLogTest extends SchemaLoader { - - static - { - System.setProperty("cassandra.commitlog.stop_on_errors", "true"); - } - @Test public void testRecoveryWithEmptyLog() throws Exception { - testMalformed(badLogFile(new byte[0])); + CommitLog.instance.recover(new File[]{ tmpFile() }); } @Test public void testRecoveryWithShortLog() throws Exception { // force EOF while reading log - testMalformed(badLogFile(100, 10)); + testRecoveryWithBadSizeArgument(100, 10); } @Test public void testRecoveryWithShortSize() throws Exception { - testMalformed(new byte[2]); + testRecovery(new byte[2]); } @Test public void testRecoveryWithShortCheckSum() throws Exception { - testMalformed(new byte[6]); + testRecovery(new byte[6]); } @Test public void testRecoveryWithGarbageLog() throws Exception { - testMalformed(garbage(100)); + byte[] garbage = new byte[100]; + (new java.util.Random()).nextBytes(garbage); + testRecovery(garbage); } @Test @@ -90,30 +83,21 @@ public class CommitLogTest extends SchemaLoader { Checksum checksum = new CRC32(); checksum.update(100); - testMalformed(badLogFile(100, checksum.getValue(), new byte[100])); - testMalformed(badLogFile(100, checksum.getValue(), garbage(100))); - } - - @Test - public void testRecoveryWithBadSize() throws Exception - { - Checksum checksum = new CRC32(); - checksum.update(100); - testMalformed(badLogFile(120, checksum.getValue(), garbage(100))); + testRecoveryWithBadSizeArgument(100, 100, ~checksum.getValue()); } @Test public void testRecoveryWithZeroSegmentSizeArgument() throws Exception { // many different combinations of 4 bytes (garbage) will be read as zero by readInt() - testMalformed(badLogFile(0, -1L, 10)); // zero size, but no EOF + testRecoveryWithBadSizeArgument(0, 10); // zero size, but no EOF } @Test public void testRecoveryWithNegativeSizeArgument() throws Exception { // garbage from a partial/bad flush could be read as a negative size even if there is no EOF - testMalformed(badLogFile(-10, 10)); // zero size, but no EOF + testRecoveryWithBadSizeArgument(-10, 10); // negative size, but no EOF } @Test @@ -190,8 +174,8 @@ public class CommitLogTest extends SchemaLoader private static int getMaxRecordDataSize(String keyspace, ByteBuffer key, String table, CellName column) { - Mutation rm = new Mutation(keyspace, key); - rm.add(table, column, ByteBuffer.allocate(0), 0); + Mutation rm = new Mutation("Keyspace1", bytes("k")); + rm.add("Standard1", Util.cellname("c1"), ByteBuffer.allocate(0), 0); int max = (DatabaseDescriptor.getCommitLogSegmentSize() / 2); max -= CommitLogSegment.ENTRY_OVERHEAD_SIZE; // log entry overhead @@ -231,73 +215,22 @@ public class CommitLogTest extends SchemaLoader } } - // construct log file with correct chunk checksum for the provided size/position - protected File badLogFile(int markerSize, int realSize) throws Exception - { - return badLogFile(markerSize, garbage(realSize)); - } - - protected File badLogFile(int markerSize, byte[] data) throws Exception - { - File logFile = tmpFile(); - CommitLogDescriptor descriptor = CommitLogDescriptor.fromFileName(logFile.getName()); - PureJavaCrc32 crc = new PureJavaCrc32(); - crc.updateInt((int) (descriptor.id & 0xFFFFFFFFL)); - crc.updateInt((int) (descriptor.id >>> 32)); - crc.updateInt(CommitLogDescriptor.HEADER_SIZE); - return badLogFile(markerSize, crc.getCrc(), data, logFile); - } - - protected byte[] garbage(int size) - { - byte[] garbage = new byte[size]; - (new java.util.Random()).nextBytes(garbage); - return garbage; - } - - protected File badLogFile(int markerSize, long checksum, int realSize) throws Exception - { - return badLogFile(markerSize, checksum, realSize, tmpFile()); - } - - protected File badLogFile(int markerSize, long checksum, int realSize, File logFile) throws Exception + protected void testRecoveryWithBadSizeArgument(int size, int dataSize) throws Exception { - return badLogFile(markerSize, checksum, new byte[realSize], logFile); - } - - protected File badLogFile(int markerSize, long checksum, byte[] chunk) throws Exception - { - return badLogFile(markerSize, checksum, chunk, tmpFile()); + Checksum checksum = new CRC32(); + checksum.update(size); + testRecoveryWithBadSizeArgument(size, dataSize, checksum.getValue()); } - protected File badLogFile(int markerSize, long checksum, byte[] chunk, File logFile) throws Exception + protected void testRecoveryWithBadSizeArgument(int size, int dataSize, long checksum) throws Exception { ByteArrayOutputStream out = new ByteArrayOutputStream(); DataOutputStream dout = new DataOutputStream(out); - ByteBuffer buffer = ByteBuffer.allocate(CommitLogDescriptor.HEADER_SIZE); - CommitLogDescriptor.writeHeader(buffer, CommitLogDescriptor.fromFileName(logFile.getName())); - out.write(buffer.array()); - dout.writeInt(markerSize); + dout.writeInt(size); dout.writeLong(checksum); - dout.write(chunk); + dout.write(new byte[dataSize]); dout.close(); - try (OutputStream lout = new FileOutputStream(logFile)) - { - lout.write(out.toByteArray()); - lout.close(); - } - return logFile; - } - - protected File badLogFile(byte[] contents) throws Exception - { - File logFile = tmpFile(); - try (OutputStream lout = new FileOutputStream(logFile)) - { - lout.write(contents); - lout.close(); - } - return logFile; + testRecovery(out.toByteArray()); } protected File tmpFile() throws IOException @@ -308,29 +241,17 @@ public class CommitLogTest extends SchemaLoader return logFile; } - private void testMalformed(byte[] contents) throws Exception - { - testMalformed(badLogFile(contents)); - testMalformed(badLogFile(contents.length, contents)); - } - - private void testMalformed(File logFile) throws Exception + protected void testRecovery(byte[] logData) throws Exception { - CommitLogReplayer.setIgnoreErrors(true); - CommitLog.instance.recover(new File[]{ logFile }); - CommitLogReplayer.setIgnoreErrors(false); - try - { - CommitLog.instance.recover(new File[]{ logFile }); - Assert.assertFalse(true); - } - catch (Throwable t) + File logFile = tmpFile(); + try (OutputStream lout = new FileOutputStream(logFile)) { - if (!(t instanceof MalformedCommitLogException)) - throw t; + lout.write(logData); + //statics make it annoying to test things correctly + CommitLog.instance.recover(new File[]{ logFile }); //CASSANDRA-1119 / CASSANDRA-1179 throw on failure*/ } } - + @Test public void testVersions() {
