Merge branch 'cassandra-3.0' into cassandra-3.3

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5669c696
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5669c696
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5669c696

Branch: refs/heads/trunk
Commit: 5669c6967bbdd540f27aeebf5a2c258bc4defbe3
Parents: de24ee9 b9bdd9e
Author: Carl Yeksigian <c...@apache.org>
Authored: Wed Feb 3 09:44:01 2016 -0500
Committer: Carl Yeksigian <c...@apache.org>
Committed: Wed Feb 3 09:44:01 2016 -0500

----------------------------------------------------------------------
 src/java/org/apache/cassandra/db/Mutation.java                    | 2 +-
 src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java | 2 +-
 src/java/org/apache/cassandra/service/paxos/PaxosState.java       | 2 +-
 3 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5669c696/src/java/org/apache/cassandra/db/Mutation.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5669c696/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 985a036,7169b2f..e0518a9
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@@ -91,74 -85,6 +91,74 @@@ public class CommitLogReplaye
      private final ReplayFilter replayFilter;
      private final CommitLogArchiver archiver;
  
 +    /*
 +     * Wrapper around initiating mutations read from the log to make it 
possible
 +     * to spy on initiated mutations for test
 +     */
 +    @VisibleForTesting
 +    public static class MutationInitiator
 +    {
 +        protected Future<Integer> initiateMutation(final Mutation mutation,
 +                                                   final long segmentId,
 +                                                   final int serializedSize,
 +                                                   final long entryLocation,
 +                                                   final CommitLogReplayer 
clr)
 +        {
 +            Runnable runnable = new WrappedRunnable()
 +            {
 +                public void runMayThrow()
 +                {
 +                    if 
(Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null)
 +                        return;
 +                    if (clr.pointInTimeExceeded(mutation))
 +                        return;
 +
 +                    final Keyspace keyspace = 
Keyspace.open(mutation.getKeyspaceName());
 +
 +                    // Rebuild the mutation, omitting column families that
 +                    //    a) the user has requested that we ignore,
 +                    //    b) have already been flushed,
 +                    // or c) are part of a cf that was dropped.
 +                    // Keep in mind that the cf.name() is suspect. do every 
thing based on the cfid instead.
 +                    Mutation newMutation = null;
 +                    for (PartitionUpdate update : 
clr.replayFilter.filter(mutation))
 +                    {
 +                        if (Schema.instance.getCF(update.metadata().cfId) == 
null)
 +                            continue; // dropped
 +
 +                        ReplayPosition rp = 
clr.cfPositions.get(update.metadata().cfId);
 +
 +                        // replay if current segment is newer than last 
flushed one or,
 +                        // if it is the last known segment, if we are after 
the replay position
 +                        if (segmentId > rp.segment || (segmentId == 
rp.segment && entryLocation > rp.position))
 +                        {
 +                            if (newMutation == null)
 +                                newMutation = new 
Mutation(mutation.getKeyspaceName(), mutation.key());
 +                            newMutation.add(update);
 +                            clr.replayedCount.incrementAndGet();
 +                        }
 +                    }
 +                    if (newMutation != null)
 +                    {
 +                        assert !newMutation.isEmpty();
 +
 +                        try
 +                        {
 +                            
Uninterruptibles.getUninterruptibly(Keyspace.open(newMutation.getKeyspaceName()).applyFromCommitLog(newMutation));
 +                        }
 +                        catch (ExecutionException e)
 +                        {
-                             Throwables.propagate(e.getCause());
++                            throw Throwables.propagate(e.getCause());
 +                        }
 +
 +                        clr.keyspacesRecovered.add(keyspace);
 +                    }
 +                }
 +            };
 +            return StageManager.getStage(Stage.MUTATION).submit(runnable, 
serializedSize);
 +        }
 +    }
 +
      CommitLogReplayer(CommitLog commitLog, ReplayPosition globalPosition, 
Map<UUID, ReplayPosition> cfPositions, ReplayFilter replayFilter)
      {
          this.keyspacesRecovered = new NonBlockingHashSet<Keyspace>();

Reply via email to