Merge branch 'cassandra-3.0' into cassandra-3.3
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5669c696 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5669c696 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5669c696 Branch: refs/heads/trunk Commit: 5669c6967bbdd540f27aeebf5a2c258bc4defbe3 Parents: de24ee9 b9bdd9e Author: Carl Yeksigian <c...@apache.org> Authored: Wed Feb 3 09:44:01 2016 -0500 Committer: Carl Yeksigian <c...@apache.org> Committed: Wed Feb 3 09:44:01 2016 -0500 ---------------------------------------------------------------------- src/java/org/apache/cassandra/db/Mutation.java | 2 +- src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java | 2 +- src/java/org/apache/cassandra/service/paxos/PaxosState.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/5669c696/src/java/org/apache/cassandra/db/Mutation.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/5669c696/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index 985a036,7169b2f..e0518a9 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@@ -91,74 -85,6 +91,74 @@@ public class CommitLogReplaye private final ReplayFilter replayFilter; private final CommitLogArchiver archiver; + /* + * Wrapper around initiating mutations read from the log to make it possible + * to spy on initiated mutations for test + */ + @VisibleForTesting + public static class MutationInitiator + { + protected Future<Integer> initiateMutation(final Mutation mutation, + final long segmentId, + final int serializedSize, + final long entryLocation, + final CommitLogReplayer clr) + { + Runnable runnable = new WrappedRunnable() + { + public void runMayThrow() + { + if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null) + return; + if (clr.pointInTimeExceeded(mutation)) + return; + + final Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName()); + + // Rebuild the mutation, omitting column families that + // a) the user has requested that we ignore, + // b) have already been flushed, + // or c) are part of a cf that was dropped. + // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead. + Mutation newMutation = null; + for (PartitionUpdate update : clr.replayFilter.filter(mutation)) + { + if (Schema.instance.getCF(update.metadata().cfId) == null) + continue; // dropped + + ReplayPosition rp = clr.cfPositions.get(update.metadata().cfId); + + // replay if current segment is newer than last flushed one or, + // if it is the last known segment, if we are after the replay position + if (segmentId > rp.segment || (segmentId == rp.segment && entryLocation > rp.position)) + { + if (newMutation == null) + newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key()); + newMutation.add(update); + clr.replayedCount.incrementAndGet(); + } + } + if (newMutation != null) + { + assert !newMutation.isEmpty(); + + try + { + Uninterruptibles.getUninterruptibly(Keyspace.open(newMutation.getKeyspaceName()).applyFromCommitLog(newMutation)); + } + catch (ExecutionException e) + { - Throwables.propagate(e.getCause()); ++ throw Throwables.propagate(e.getCause()); + } + + clr.keyspacesRecovered.add(keyspace); + } + } + }; + return StageManager.getStage(Stage.MUTATION).submit(runnable, serializedSize); + } + } + CommitLogReplayer(CommitLog commitLog, ReplayPosition globalPosition, Map<UUID, ReplayPosition> cfPositions, ReplayFilter replayFilter) { this.keyspacesRecovered = new NonBlockingHashSet<Keyspace>();