Merge branch 'cassandra-3.0' into cassandra-3.3

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/de24ee94
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/de24ee94
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/de24ee94

Branch: refs/heads/trunk
Commit: de24ee94f355042f24f04c3dec8c1c50000e4ec3
Parents: 78fe8c4 5f1e4bc
Author: Carl Yeksigian <c...@apache.org>
Authored: Wed Feb 3 09:29:20 2016 -0500
Committer: Carl Yeksigian <c...@apache.org>
Committed: Wed Feb 3 09:29:20 2016 -0500

----------------------------------------------------------------------
 src/java/org/apache/cassandra/db/Mutation.java        |  3 ++-
 .../cassandra/db/commitlog/CommitLogReplayer.java     | 14 ++++++++++++--
 .../apache/cassandra/service/paxos/PaxosState.java    |  3 ++-
 3 files changed, 16 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/de24ee94/src/java/org/apache/cassandra/db/Mutation.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de24ee94/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 55bdf07,33750f8..985a036
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@@ -30,8 -30,8 +30,9 @@@ import java.util.concurrent.Future
  import java.util.concurrent.atomic.AtomicInteger;
  import java.util.zip.CRC32;
  
 +import com.google.common.annotations.VisibleForTesting;
  import com.google.common.base.Predicate;
+ import com.google.common.base.Throwables;
  import com.google.common.collect.HashMultimap;
  import com.google.common.collect.Iterables;
  import com.google.common.collect.Multimap;
@@@ -90,65 -85,6 +91,74 @@@ public class CommitLogReplaye
      private final ReplayFilter replayFilter;
      private final CommitLogArchiver archiver;
  
 +    /*
 +     * Wrapper around initiating mutations read from the log to make it 
possible
 +     * to spy on initiated mutations for test
 +     */
 +    @VisibleForTesting
 +    public static class MutationInitiator
 +    {
 +        protected Future<Integer> initiateMutation(final Mutation mutation,
 +                                                   final long segmentId,
 +                                                   final int serializedSize,
 +                                                   final long entryLocation,
 +                                                   final CommitLogReplayer 
clr)
 +        {
 +            Runnable runnable = new WrappedRunnable()
 +            {
-                 public void runMayThrow() throws ExecutionException
++                public void runMayThrow()
 +                {
 +                    if 
(Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null)
 +                        return;
 +                    if (clr.pointInTimeExceeded(mutation))
 +                        return;
 +
 +                    final Keyspace keyspace = 
Keyspace.open(mutation.getKeyspaceName());
 +
 +                    // Rebuild the mutation, omitting column families that
 +                    //    a) the user has requested that we ignore,
 +                    //    b) have already been flushed,
 +                    // or c) are part of a cf that was dropped.
 +                    // Keep in mind that the cf.name() is suspect. do every 
thing based on the cfid instead.
 +                    Mutation newMutation = null;
 +                    for (PartitionUpdate update : 
clr.replayFilter.filter(mutation))
 +                    {
 +                        if (Schema.instance.getCF(update.metadata().cfId) == 
null)
 +                            continue; // dropped
 +
 +                        ReplayPosition rp = 
clr.cfPositions.get(update.metadata().cfId);
 +
 +                        // replay if current segment is newer than last 
flushed one or,
 +                        // if it is the last known segment, if we are after 
the replay position
 +                        if (segmentId > rp.segment || (segmentId == 
rp.segment && entryLocation > rp.position))
 +                        {
 +                            if (newMutation == null)
 +                                newMutation = new 
Mutation(mutation.getKeyspaceName(), mutation.key());
 +                            newMutation.add(update);
 +                            clr.replayedCount.incrementAndGet();
 +                        }
 +                    }
 +                    if (newMutation != null)
 +                    {
 +                        assert !newMutation.isEmpty();
-                         
Uninterruptibles.getUninterruptibly(Keyspace.open(newMutation.getKeyspaceName()).applyFromCommitLog(newMutation));
++
++                        try
++                        {
++                            
Uninterruptibles.getUninterruptibly(Keyspace.open(newMutation.getKeyspaceName()).applyFromCommitLog(newMutation));
++                        }
++                        catch (ExecutionException e)
++                        {
++                            Throwables.propagate(e.getCause());
++                        }
++
 +                        clr.keyspacesRecovered.add(keyspace);
 +                    }
 +                }
 +            };
 +            return StageManager.getStage(Stage.MUTATION).submit(runnable, 
serializedSize);
 +        }
 +    }
 +
      CommitLogReplayer(CommitLog commitLog, ReplayPosition globalPosition, 
Map<UUID, ReplayPosition> cfPositions, ReplayFilter replayFilter)
      {
          this.keyspacesRecovered = new NonBlockingHashSet<Keyspace>();

Reply via email to