Merge branch 'cassandra-2.2' into cassandra-3.0

Conflicts:
        src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
        src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
        src/java/org/apache/cassandra/service/ActiveRepairService.java
        
test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
        test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f490ccec
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f490ccec
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f490ccec

Branch: refs/heads/trunk
Commit: f490ccec62b2b86b9dbf0fff3888852aedbc7f65
Parents: 8afc76a 842f150
Author: Marcus Eriksson <marc...@apache.org>
Authored: Tue Aug 25 19:05:23 2015 +0200
Committer: Marcus Eriksson <marc...@apache.org>
Committed: Tue Aug 25 19:05:34 2015 +0200

----------------------------------------------------------------------
 .../repair/RepairMessageVerbHandler.java        |  5 +-
 .../apache/cassandra/repair/RepairRunnable.java |  2 +-
 .../repair/messages/PrepareMessage.java         | 10 +++-
 .../cassandra/repair/messages/RepairOption.java |  4 ++
 .../cassandra/service/ActiveRepairService.java  | 49 +++++++++++++-------
 .../LeveledCompactionStrategyTest.java          |  2 +-
 .../cassandra/repair/LocalSyncTaskTest.java     |  2 +-
 7 files changed, 51 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f490ccec/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index 942d902,796f135..ffba9d6
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@@ -73,7 -82,7 +73,8 @@@ public class RepairMessageVerbHandler i
                              columnFamilyStores,
                              prepareMessage.ranges,
                              prepareMessage.isIncremental,
-                             prepareMessage.timestamp);
 -                            isGlobal);
++                            prepareMessage.timestamp,
++                            prepareMessage.isGlobal);
                      MessagingService.instance().sendReply(new 
MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from);
                      break;
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f490ccec/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f490ccec/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
index 0cd73db,a57c27e..8909f1b
--- a/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
+++ b/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
@@@ -40,16 -40,14 +40,18 @@@ public class PrepareMessage extends Rep
  
      public final UUID parentRepairSession;
      public final boolean isIncremental;
 +    public final long timestamp;
++    public final boolean isGlobal;
  
-     public PrepareMessage(UUID parentRepairSession, List<UUID> cfIds, 
Collection<Range<Token>> ranges, boolean isIncremental, long timestamp)
 -    public PrepareMessage(UUID parentRepairSession, List<UUID> cfIds, 
Collection<Range<Token>> ranges, boolean isIncremental, boolean isGlobal)
++    public PrepareMessage(UUID parentRepairSession, List<UUID> cfIds, 
Collection<Range<Token>> ranges, boolean isIncremental, long timestamp, boolean 
isGlobal)
      {
 -        super(isGlobal ? Type.PREPARE_GLOBAL_MESSAGE : Type.PREPARE_MESSAGE, 
null);
 +        super(Type.PREPARE_MESSAGE, null);
          this.parentRepairSession = parentRepairSession;
          this.cfIds = cfIds;
          this.ranges = ranges;
          this.isIncremental = isIncremental;
 +        this.timestamp = timestamp;
++        this.isGlobal = isGlobal;
      }
  
      public static class PrepareMessageSerializer implements 
MessageSerializer<PrepareMessage>
@@@ -67,10 -65,9 +69,11 @@@
                  Range.tokenSerializer.serialize(r, out, version);
              }
              out.writeBoolean(message.isIncremental);
 +            out.writeLong(message.timestamp);
++            out.writeBoolean(message.isGlobal);
          }
  
 -        public PrepareMessage deserialize(DataInput in, int version) throws 
IOException
 +        public PrepareMessage deserialize(DataInputPlus in, int version) 
throws IOException
          {
              int cfIdCount = in.readInt();
              List<UUID> cfIds = new ArrayList<>(cfIdCount);
@@@ -82,8 -79,8 +85,9 @@@
              for (int i = 0; i < rangeCount; i++)
                  ranges.add((Range<Token>) 
Range.tokenSerializer.deserialize(in, MessagingService.globalPartitioner(), 
version));
              boolean isIncremental = in.readBoolean();
 -
 -            return new PrepareMessage(parentRepairSession, cfIds, ranges, 
isIncremental, false);
 +            long timestamp = in.readLong();
-             return new PrepareMessage(parentRepairSession, cfIds, ranges, 
isIncremental, timestamp);
++            boolean isGlobal = in.readBoolean();
++            return new PrepareMessage(parentRepairSession, cfIds, ranges, 
isIncremental, timestamp, isGlobal);
          }
  
          public long serializedSize(PrepareMessage message, int version)
@@@ -93,11 -91,10 +97,12 @@@
              for (UUID cfId : message.cfIds)
                  size += UUIDSerializer.serializer.serializedSize(cfId, 
version);
              size += 
UUIDSerializer.serializer.serializedSize(message.parentRepairSession, version);
 -            size += sizes.sizeof(message.ranges.size());
 +            size += TypeSizes.sizeof(message.ranges.size());
              for (Range<Token> r : message.ranges)
                  size += Range.tokenSerializer.serializedSize(r, version);
 -            size += sizes.sizeof(message.isIncremental);
 +            size += TypeSizes.sizeof(message.isIncremental);
 +            size += TypeSizes.sizeof(message.timestamp);
++            size += TypeSizes.sizeof(message.isGlobal);
              return size;
          }
      }
@@@ -110,7 -107,6 +115,8 @@@
                  ", ranges=" + ranges +
                  ", parentRepairSession=" + parentRepairSession +
                  ", isIncremental="+isIncremental +
 +                ", timestamp=" + timestamp +
++                ", isGlobal=" + isGlobal +
                  '}';
      }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f490ccec/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java
index 0e09cf7,a6389ea..8079b3a
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@@ -233,8 -237,7 +237,8 @@@ public class ActiveRepairServic
  
      public synchronized UUID prepareForRepair(UUID parentRepairSession, 
Set<InetAddress> endpoints, RepairOption options, List<ColumnFamilyStore> 
columnFamilyStores)
      {
 -        registerParentRepairSession(parentRepairSession, columnFamilyStores, 
options.getRanges(), options.isIncremental(), options.isGlobal());
 +        long timestamp = System.currentTimeMillis();
-         registerParentRepairSession(parentRepairSession, columnFamilyStores, 
options.getRanges(), options.isIncremental(), timestamp);
++        registerParentRepairSession(parentRepairSession, columnFamilyStores, 
options.getRanges(), options.isIncremental(), timestamp, options.isGlobal());
          final CountDownLatch prepareLatch = new 
CountDownLatch(endpoints.size());
          final AtomicBoolean status = new AtomicBoolean(true);
          final Set<String> failedNodes = Collections.synchronizedSet(new 
HashSet<String>());
@@@ -264,7 -267,10 +268,7 @@@
  
          for (InetAddress neighbour : endpoints)
          {
-             PrepareMessage message = new PrepareMessage(parentRepairSession, 
cfIds, options.getRanges(), options.isIncremental(), timestamp);
 -            CassandraVersion peerVersion = 
SystemKeyspace.getReleaseVersion(neighbour);
 -            boolean isGlobal = options.isGlobal() && peerVersion != null && 
peerVersion.compareTo(SUPPORTS_GLOBAL_PREPARE_FLAG_VERSION) >= 0;
 -            logger.debug("Sending prepare message: options.isGlobal = {}, 
peerVersion = {}", options.isGlobal(), peerVersion);
 -            PrepareMessage message = new PrepareMessage(parentRepairSession, 
cfIds, options.getRanges(), options.isIncremental(), isGlobal);
++            PrepareMessage message = new PrepareMessage(parentRepairSession, 
cfIds, options.getRanges(), options.isIncremental(), timestamp, 
options.isGlobal());
              MessageOut<RepairMessage> msg = message.createMessage();
              MessagingService.instance().sendRR(msg, neighbour, callback, 
TimeUnit.HOURS.toMillis(1), true);
          }
@@@ -287,9 -293,9 +291,9 @@@
          return parentRepairSession;
      }
  
-     public void registerParentRepairSession(UUID parentRepairSession, 
List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, 
boolean isIncremental, long timestamp)
 -    public void registerParentRepairSession(UUID parentRepairSession, 
List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, 
boolean isIncremental, boolean isGlobal)
++    public void registerParentRepairSession(UUID parentRepairSession, 
List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, 
boolean isIncremental, long timestamp, boolean isGlobal)
      {
-         parentRepairSessions.put(parentRepairSession, new 
ParentRepairSession(columnFamilyStores, ranges, isIncremental, timestamp));
 -        parentRepairSessions.put(parentRepairSession, new 
ParentRepairSession(columnFamilyStores, ranges, isIncremental, isGlobal, 
System.currentTimeMillis()));
++        parentRepairSessions.put(parentRepairSession, new 
ParentRepairSession(columnFamilyStores, ranges, isIncremental, timestamp, 
isGlobal));
      }
  
      public Set<SSTableReader> currentlyRepairing(UUID cfId, UUID 
parentRepairSession)
@@@ -401,16 -413,18 +411,18 @@@
          private final Map<UUID, ColumnFamilyStore> columnFamilyStores = new 
HashMap<>();
          private final Collection<Range<Token>> ranges;
          private final Map<UUID, Set<SSTableReader>> sstableMap = new 
HashMap<>();
-         public final long repairedAt;
+         private final long repairedAt;
          public final boolean isIncremental;
+         private final boolean isGlobal;
  
-         public ParentRepairSession(List<ColumnFamilyStore> 
columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, 
long repairedAt)
 -        public ParentRepairSession(List<ColumnFamilyStore> 
columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, 
boolean isGlobal, long repairedAt)
++        public ParentRepairSession(List<ColumnFamilyStore> 
columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, 
long repairedAt, boolean isGlobal)
          {
              for (ColumnFamilyStore cfs : columnFamilyStores)
                  this.columnFamilyStores.put(cfs.metadata.cfId, cfs);
              this.ranges = ranges;
              this.repairedAt = repairedAt;
 -            this.isGlobal = isGlobal;
              this.isIncremental = isIncremental;
++            this.isGlobal = isGlobal;
          }
  
          public void addSSTables(UUID cfId, Set<SSTableReader> sstables)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f490ccec/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --cc 
test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index 456dcd1,63fd0e7..8050b6c
--- 
a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ 
b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@@ -183,10 -195,10 +183,10 @@@ public class LeveledCompactionStrategyT
          assertTrue(strategy.getSSTableCountPerLevel()[2] > 0);
  
          Range<Token> range = new Range<>(Util.token(""), Util.token(""));
 -        int gcBefore = 
keyspace.getColumnFamilyStore(CF_STANDARDDLEVELED).gcBefore(System.currentTimeMillis());
 +        int gcBefore = 
keyspace.getColumnFamilyStore(CF_STANDARDDLEVELED).gcBefore(FBUtilities.nowInSeconds());
          UUID parentRepSession = UUID.randomUUID();
-         
ActiveRepairService.instance.registerParentRepairSession(parentRepSession, 
Arrays.asList(cfs), Arrays.asList(range), false, System.currentTimeMillis());
 -        
ActiveRepairService.instance.registerParentRepairSession(parentRepSession, 
Arrays.asList(cfs), Arrays.asList(range), false, true);
 -        RepairJobDesc desc = new RepairJobDesc(parentRepSession, 
UUID.randomUUID(), KEYSPACE1, CF_STANDARDDLEVELED, range);
++        
ActiveRepairService.instance.registerParentRepairSession(parentRepSession, 
Arrays.asList(cfs), Arrays.asList(range), false, System.currentTimeMillis(), 
true);
 +        RepairJobDesc desc = new RepairJobDesc(parentRepSession, 
UUID.randomUUID(), KEYSPACE1, CF_STANDARDDLEVELED, Arrays.asList(range));
          Validator validator = new Validator(desc, 
FBUtilities.getBroadcastAddress(), gcBefore);
          CompactionManager.instance.submitValidation(cfs, validator).get();
      }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f490ccec/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
index ff5b99e,e5c03b9..eec29bc
--- a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
+++ b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
@@@ -89,13 -89,12 +89,13 @@@ public class LocalSyncTaskTest extends 
          Keyspace keyspace = Keyspace.open(KEYSPACE1);
          ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
  
-         
ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, 
Arrays.asList(cfs), Arrays.asList(range), false, System.currentTimeMillis());
 -        
ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, 
Arrays.asList(cfs), Arrays.asList(range), false, false);
++        
ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, 
Arrays.asList(cfs), Arrays.asList(range), false, System.currentTimeMillis(), 
false);
 +
 +        RepairJobDesc desc = new RepairJobDesc(parentRepairSession, 
UUID.randomUUID(), KEYSPACE1, "Standard1", Arrays.asList(range));
  
 -        RepairJobDesc desc = new RepairJobDesc(parentRepairSession, 
UUID.randomUUID(), KEYSPACE1, "Standard1", range);
 +        MerkleTrees tree1 = createInitialTree(desc);
  
 -        MerkleTree tree1 = createInitialTree(desc);
 -        MerkleTree tree2 = createInitialTree(desc);
 +        MerkleTrees tree2 = createInitialTree(desc);
  
          // change a range in one of the trees
          Token token = partirioner.midpoint(range.left, range.right);

Reply via email to