Updated Branches: refs/heads/cassandra-1.1 044e17a22 -> 5923d3295 refs/heads/trunk 0cc97d91c -> 64305dd9f
merge from 1.1 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/64305dd9 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/64305dd9 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/64305dd9 Branch: refs/heads/trunk Commit: 64305dd9f9e608b51ce521158269c6c7222def24 Parents: 0cc97d9 5923d32 Author: Jonathan Ellis <[email protected]> Authored: Mon Apr 9 11:28:33 2012 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Mon Apr 9 11:29:49 2012 -0500 ---------------------------------------------------------------------- CHANGES.txt | 2 + conf/cassandra.yaml | 8 + conf/commitlog_archiving.properties | 37 ++ src/java/org/apache/cassandra/config/Config.java | 1 + .../cassandra/config/DatabaseDescriptor.java | 8 + .../apache/cassandra/db/commitlog/CommitLog.java | 253 ++------------ .../cassandra/db/commitlog/CommitLogAllocator.java | 20 +- .../cassandra/db/commitlog/CommitLogArchiver.java | 147 ++++++++ .../cassandra/db/commitlog/CommitLogMBean.java | 17 + .../cassandra/db/commitlog/CommitLogReplayer.java | 269 +++++++++++++++ .../cassandra/db/commitlog/CommitLogSegment.java | 14 +- src/java/org/apache/cassandra/utils/CLibrary.java | 33 +-- .../org/apache/cassandra/utils/FBUtilities.java | 34 ++- 13 files changed, 580 insertions(+), 263 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/64305dd9/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 2c42ced,b80368a..eb97992 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,11 -1,6 +1,13 @@@ +1.2-dev + * Track tombstone expiration and compact when tombstone content is + higher than a configurable threshold, default 20% (CASSANDRA-3442) + * update MurmurHash to version 3 (CASSANDRA-2975) + * (CLI) track elapsed time for `delete' operation (CASSANDRA-4060) + + 1.1.1-dev + * add support for commitlog archiving and point-in-time recovery + (CASSANDRA-3647) * update caches to use byte[] keys to reduce memory overhead (CASSANDRA-3966) * add column limit to cli (CASSANDRA-3012, 4098) * clean up and optimize DataOutputBuffer, used by CQL compression and http://git-wip-us.apache.org/repos/asf/cassandra/blob/64305dd9/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/64305dd9/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/64305dd9/src/java/org/apache/cassandra/db/commitlog/CommitLog.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLog.java index 5b3c7b0,3c34772..2305dd5 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java @@@ -398,8 -201,8 +200,8 @@@ public class CommitLog implements Commi */ public void add(RowMutation rm) throws IOException { - long totalSize = RowMutation.serializer().serializedSize(rm, MessagingService.version_) + CommitLogSegment.ENTRY_OVERHEAD_SIZE; + long totalSize = RowMutation.serializer().serializedSize(rm, MessagingService.current_version) + CommitLogSegment.ENTRY_OVERHEAD_SIZE; - if (totalSize > CommitLog.SEGMENT_SIZE) + if (totalSize > DatabaseDescriptor.getCommitLogSegmentSize()) { logger.warn("Skipping commitlog append of extremely large mutation ({} bytes)", totalSize); return; http://git-wip-us.apache.org/repos/asf/cassandra/blob/64305dd9/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/64305dd9/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/64305dd9/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index 0000000,eb997fc..488af20 mode 000000,100644..100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@@ -1,0 -1,269 +1,269 @@@ + package org.apache.cassandra.db.commitlog; + + import java.io.DataInputStream; + import java.io.EOFException; + import java.io.File; + import java.io.IOException; + import java.util.ArrayList; + import java.util.HashMap; + import java.util.List; + import java.util.Map; + import java.util.Set; + import java.util.concurrent.Future; + import java.util.concurrent.atomic.AtomicInteger; + import java.util.zip.Checksum; + + import org.apache.cassandra.concurrent.Stage; + import org.apache.cassandra.concurrent.StageManager; + import org.apache.cassandra.config.Schema; + import org.apache.cassandra.db.ColumnFamily; + import org.apache.cassandra.db.ColumnFamilyStore; + import org.apache.cassandra.db.RowMutation; + import org.apache.cassandra.db.Table; + import org.apache.cassandra.db.UnknownColumnFamilyException; + import org.apache.cassandra.io.IColumnSerializer; + import org.apache.cassandra.io.util.FastByteArrayInputStream; + import org.apache.cassandra.io.util.FileUtils; + import org.apache.cassandra.io.util.RandomAccessReader; + import org.apache.cassandra.net.MessagingService; + import org.apache.cassandra.utils.ByteBufferUtil; + import org.apache.cassandra.utils.FBUtilities; + import org.apache.cassandra.utils.PureJavaCrc32; + import org.apache.cassandra.utils.WrappedRunnable; + import org.apache.commons.lang.StringUtils; + import org.cliffc.high_scale_lib.NonBlockingHashSet; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + import com.google.common.collect.Ordering; + + public class CommitLogReplayer + { + private static final Logger logger = LoggerFactory.getLogger(CommitLogReplayer.class); + private static final int MAX_OUTSTANDING_REPLAY_COUNT = 1024; + + private final Set<Table> tablesRecovered; + private final List<Future<?>> futures; + private final Map<Integer, AtomicInteger> invalidMutations; + private final AtomicInteger replayedCount; + private final Map<Integer, ReplayPosition> cfPositions; + private final ReplayPosition globalPosition; + private final Checksum checksum; + private byte[] buffer; + + public CommitLogReplayer() + { + this.tablesRecovered = new NonBlockingHashSet<Table>(); + this.futures = new ArrayList<Future<?>>(); + this.buffer = new byte[4096]; + this.invalidMutations = new HashMap<Integer, AtomicInteger>(); + // count the number of replayed mutation. We don't really care about atomicity, but we need it to be a reference. + this.replayedCount = new AtomicInteger(); + // compute per-CF and global replay positions + this.cfPositions = new HashMap<Integer, ReplayPosition>(); + for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) + { + // it's important to call RP.gRP per-cf, before aggregating all the positions w/ the Ordering.min call + // below: gRP will return NONE if there are no flushed sstables, which is important to have in the + // list (otherwise we'll just start replay from the first flush position that we do have, which is not correct). + ReplayPosition rp = ReplayPosition.getReplayPosition(cfs.getSSTables()); + cfPositions.put(cfs.metadata.cfId, rp); + } + this.globalPosition = Ordering.from(ReplayPosition.comparator).min(cfPositions.values()); + this.checksum = new PureJavaCrc32(); + } + + public void recover(File[] clogs) throws IOException + { + for (final File file : clogs) + recover(file); + } + + public int blockForWrites() throws IOException + { + for (Map.Entry<Integer, AtomicInteger> entry : invalidMutations.entrySet()) + logger.info(String.format("Skipped %d mutations from unknown (probably removed) CF with id %d", entry.getValue().intValue(), entry.getKey())); + + // wait for all the writes to finish on the mutation stage + FBUtilities.waitOnFutures(futures); + logger.debug("Finished waiting on mutations from recovery"); + + // flush replayed tables + futures.clear(); + for (Table table : tablesRecovered) + futures.addAll(table.flush()); + FBUtilities.waitOnFutures(futures); + return replayedCount.get(); + } + + public void recover(File file) throws IOException + { + logger.info("Replaying " + file.getPath()); + final long segment = CommitLogSegment.idFromFilename(file.getName()); + RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath()), true); + assert reader.length() <= Integer.MAX_VALUE; + try + { + int replayPosition; + if (globalPosition.segment < segment) + replayPosition = 0; + else if (globalPosition.segment == segment) + replayPosition = globalPosition.position; + else + replayPosition = (int) reader.length(); + + if (replayPosition < 0 || replayPosition >= reader.length()) + { + // replayPosition > reader.length() can happen if some data gets flushed before it is written to the commitlog + // (see https://issues.apache.org/jira/browse/CASSANDRA-2285) + logger.debug("skipping replay of fully-flushed {}", file); + return; + } + + reader.seek(replayPosition); + + if (logger.isDebugEnabled()) + logger.debug("Replaying " + file + " starting at " + reader.getFilePointer()); + + /* read the logs populate RowMutation and apply */ + while (!reader.isEOF()) + { + if (logger.isDebugEnabled()) + logger.debug("Reading mutation at " + reader.getFilePointer()); + + long claimedCRC32; + int serializedSize; + try + { + // any of the reads may hit EOF + serializedSize = reader.readInt(); + if (serializedSize == CommitLog.END_OF_SEGMENT_MARKER) + { + logger.debug("Encountered end of segment marker at " + reader.getFilePointer()); + break; + } + + // RowMutation must be at LEAST 10 bytes: + // 3 each for a non-empty Table and Key (including the + // 2-byte length from writeUTF/writeWithShortLength) and 4 bytes for column count. + // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128 + if (serializedSize < 10) + break; + long claimedSizeChecksum = reader.readLong(); + checksum.reset(); + checksum.update(serializedSize); + if (checksum.getValue() != claimedSizeChecksum) + break; // entry wasn't synced correctly/fully. that's + // ok. + + if (serializedSize > buffer.length) + buffer = new byte[(int) (1.2 * serializedSize)]; + reader.readFully(buffer, 0, serializedSize); + claimedCRC32 = reader.readLong(); + } + catch (EOFException eof) + { + break; // last CL entry didn't get completely written. that's ok. + } + + checksum.update(buffer, 0, serializedSize); + if (claimedCRC32 != checksum.getValue()) + { + // this entry must not have been fsynced. probably the rest is bad too, + // but just in case there is no harm in trying them (since we still read on an entry boundary) + continue; + } + + /* deserialize the commit log entry */ + FastByteArrayInputStream bufIn = new FastByteArrayInputStream(buffer, 0, serializedSize); + RowMutation rm; + try + { + // assuming version here. We've gone to lengths to make sure what gets written to the CL is in + // the current version. so do make sure the CL is drained prior to upgrading a node. - rm = RowMutation.serializer().deserialize(new DataInputStream(bufIn), MessagingService.version_, IColumnSerializer.Flag.LOCAL); ++ rm = RowMutation.serializer().deserialize(new DataInputStream(bufIn), MessagingService.current_version, IColumnSerializer.Flag.LOCAL); + } + catch (UnknownColumnFamilyException ex) + { + AtomicInteger i = invalidMutations.get(ex.cfId); + if (i == null) + { + i = new AtomicInteger(1); + invalidMutations.put(ex.cfId, i); + } + else + i.incrementAndGet(); + continue; + } + + if (logger.isDebugEnabled()) + logger.debug(String.format("replaying mutation for %s.%s: %s", rm.getTable(), ByteBufferUtil.bytesToHex(rm.key()), "{" + StringUtils.join(rm.getColumnFamilies().iterator(), ", ") + + "}")); + + final long entryLocation = reader.getFilePointer(); + final RowMutation frm = rm; + Runnable runnable = new WrappedRunnable() + { + public void runMayThrow() throws IOException + { + if (Schema.instance.getKSMetaData(frm.getTable()) == null) + return; + if (pointInTimeExceeded(frm)) + return; + + final Table table = Table.open(frm.getTable()); + RowMutation newRm = new RowMutation(frm.getTable(), frm.key()); + + // Rebuild the row mutation, omitting column families that + // a) have already been flushed, + // b) are part of a cf that was dropped. Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead. + for (ColumnFamily columnFamily : frm.getColumnFamilies()) + { + if (Schema.instance.getCF(columnFamily.id()) == null) + // null means the cf has been dropped + continue; + + ReplayPosition rp = cfPositions.get(columnFamily.id()); + + // replay if current segment is newer than last flushed one or, + // if it is the last known segment, if we are after the replay position + if (segment > rp.segment || (segment == rp.segment && entryLocation > rp.position)) + { + newRm.add(columnFamily); + replayedCount.incrementAndGet(); + } + } + if (!newRm.isEmpty()) + { + Table.open(newRm.getTable()).apply(newRm, false); + tablesRecovered.add(table); + } + } + }; + futures.add(StageManager.getStage(Stage.MUTATION).submit(runnable)); + if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT) + { + FBUtilities.waitOnFutures(futures); + futures.clear(); + } + } + } + finally + { + FileUtils.closeQuietly(reader); + logger.info("Finished reading " + file); + } + } + + protected boolean pointInTimeExceeded(RowMutation frm) + { + long restoreTarget = CommitLog.instance.archiver.restorePointInTime; + + for (ColumnFamily families : frm.getColumnFamilies()) + { + if (families.maxTimestamp() > restoreTarget) + return true; + } + return false; + } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/64305dd9/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/64305dd9/src/java/org/apache/cassandra/utils/CLibrary.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/64305dd9/src/java/org/apache/cassandra/utils/FBUtilities.java ----------------------------------------------------------------------
