Merge branch 'cassandra-3.0' into cassandra-3.3
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/de24ee94 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/de24ee94 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/de24ee94 Branch: refs/heads/trunk Commit: de24ee94f355042f24f04c3dec8c1c50000e4ec3 Parents: 78fe8c4 5f1e4bc Author: Carl Yeksigian <c...@apache.org> Authored: Wed Feb 3 09:29:20 2016 -0500 Committer: Carl Yeksigian <c...@apache.org> Committed: Wed Feb 3 09:29:20 2016 -0500 ---------------------------------------------------------------------- src/java/org/apache/cassandra/db/Mutation.java | 3 ++- .../cassandra/db/commitlog/CommitLogReplayer.java | 14 ++++++++++++-- .../apache/cassandra/service/paxos/PaxosState.java | 3 ++- 3 files changed, 16 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/de24ee94/src/java/org/apache/cassandra/db/Mutation.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/de24ee94/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index 55bdf07,33750f8..985a036 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@@ -30,8 -30,8 +30,9 @@@ import java.util.concurrent.Future import java.util.concurrent.atomic.AtomicInteger; import java.util.zip.CRC32; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicate; + import com.google.common.base.Throwables; import com.google.common.collect.HashMultimap; import com.google.common.collect.Iterables; import com.google.common.collect.Multimap; @@@ -90,65 -85,6 +91,74 @@@ public class CommitLogReplaye private final ReplayFilter replayFilter; private final CommitLogArchiver archiver; + /* + * Wrapper around initiating mutations read from the log to make it possible + * to spy on initiated mutations for test + */ + @VisibleForTesting + public static class MutationInitiator + { + protected Future<Integer> initiateMutation(final Mutation mutation, + final long segmentId, + final int serializedSize, + final long entryLocation, + final CommitLogReplayer clr) + { + Runnable runnable = new WrappedRunnable() + { - public void runMayThrow() throws ExecutionException ++ public void runMayThrow() + { + if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null) + return; + if (clr.pointInTimeExceeded(mutation)) + return; + + final Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName()); + + // Rebuild the mutation, omitting column families that + // a) the user has requested that we ignore, + // b) have already been flushed, + // or c) are part of a cf that was dropped. + // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead. + Mutation newMutation = null; + for (PartitionUpdate update : clr.replayFilter.filter(mutation)) + { + if (Schema.instance.getCF(update.metadata().cfId) == null) + continue; // dropped + + ReplayPosition rp = clr.cfPositions.get(update.metadata().cfId); + + // replay if current segment is newer than last flushed one or, + // if it is the last known segment, if we are after the replay position + if (segmentId > rp.segment || (segmentId == rp.segment && entryLocation > rp.position)) + { + if (newMutation == null) + newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key()); + newMutation.add(update); + clr.replayedCount.incrementAndGet(); + } + } + if (newMutation != null) + { + assert !newMutation.isEmpty(); - Uninterruptibles.getUninterruptibly(Keyspace.open(newMutation.getKeyspaceName()).applyFromCommitLog(newMutation)); ++ ++ try ++ { ++ Uninterruptibles.getUninterruptibly(Keyspace.open(newMutation.getKeyspaceName()).applyFromCommitLog(newMutation)); ++ } ++ catch (ExecutionException e) ++ { ++ Throwables.propagate(e.getCause()); ++ } ++ + clr.keyspacesRecovered.add(keyspace); + } + } + }; + return StageManager.getStage(Stage.MUTATION).submit(runnable, serializedSize); + } + } + CommitLogReplayer(CommitLog commitLog, ReplayPosition globalPosition, Map<UUID, ReplayPosition> cfPositions, ReplayFilter replayFilter) { this.keyspacesRecovered = new NonBlockingHashSet<Keyspace>();