Merge branch 'cassandra-2.2' into cassandra-3.0

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d572ab0a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d572ab0a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d572ab0a

Branch: refs/heads/trunk
Commit: d572ab0ac86b4e635c2dd83e56926cc6192f1c2a
Parents: 9904c62 d6ffa4b
Author: Marcus Eriksson <marc...@apache.org>
Authored: Tue May 24 07:42:07 2016 +0200
Committer: Marcus Eriksson <marc...@apache.org>
Committed: Tue May 24 07:42:07 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../db/compaction/CompactionManager.java        |  2 +-
 .../cassandra/service/ActiveRepairService.java  | 59 ++++++++++-------
 .../service/ActiveRepairServiceTest.java        | 66 +++++++++++++++++++-
 4 files changed, 102 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d572ab0a/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 608d6c9,af97cd1..69e8c5d
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,36 -1,7 +1,38 @@@
 -2.2.7
 - * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669)
 +3.0.7
 + * Use CFS.initialDirectories when clearing snapshots (CASSANDRA-11705)
 + * Allow compaction strategies to disable early open (CASSANDRA-11754)
 + * Refactor Materialized View code (CASSANDRA-11475)
 + * Update Java Driver (CASSANDRA-11615)
 +Merged from 2.2:
 + * Add message dropped tasks to nodetool netstats (CASSANDRA-11855)
   * Add seconds to cqlsh tracing session duration (CASSANDRA-11753)
 - * Prohibit Reverse Counter type as part of the PK (CASSANDRA-9395)
 + * Prohibit Reversed Counter type as part of the PK (CASSANDRA-9395)
++Merged from 2.1:
++ * Avoid holding SSTableReaders for duration of incremental repair 
(CASSANDRA-11739)
 +
 +
 +3.0.6
 + * Disallow creating view with a static column (CASSANDRA-11602)
 + * Reduce the amount of object allocations caused by the getFunctions methods 
(CASSANDRA-11593)
 + * Potential error replaying commitlog with smallint/tinyint/date/time types 
(CASSANDRA-11618)
 + * Fix queries with filtering on counter columns (CASSANDRA-11629)
 + * Improve tombstone printing in sstabledump (CASSANDRA-11655)
 + * Fix paging for range queries where all clustering columns are specified 
(CASSANDRA-11669)
 + * Don't require HEAP_NEW_SIZE to be set when using G1 (CASSANDRA-11600)
 + * Fix sstabledump not showing cells after tombstone marker (CASSANDRA-11654)
 + * Ignore all LocalStrategy keyspaces for streaming and other related
 +   operations (CASSANDRA-11627)
 + * Ensure columnfilter covers indexed columns for thrift 2i queries 
(CASSANDRA-11523)
 + * Only open one sstable scanner per sstable (CASSANDRA-11412)
 + * Option to specify ProtocolVersion in cassandra-stress (CASSANDRA-11410)
 + * ArithmeticException in avgFunctionForDecimal (CASSANDRA-11485)
 + * LogAwareFileLister should only use OLD sstable files in current folder to 
determine disk consistency (CASSANDRA-11470)
 + * Notify indexers of expired rows during compaction (CASSANDRA-11329)
 + * Properly respond with ProtocolError when a v1/v2 native protocol
 +   header is received (CASSANDRA-11464)
 + * Validate that num_tokens and initial_token are consistent with one another 
(CASSANDRA-10120)
 +Merged from 2.2:
 + * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669)
   * cqlsh: correctly handle non-ascii chars in error messages (CASSANDRA-11626)
   * Exit JVM if JMX server fails to startup (CASSANDRA-11540)
   * Produce a heap dump when exiting on OOM (CASSANDRA-9861)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d572ab0a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d572ab0a/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java
index 6c75149,1ea5aaf..5aac886
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@@ -446,39 -448,50 +446,50 @@@ public class ActiveRepairServic
              }
              this.ranges = ranges;
              this.repairedAt = repairedAt;
 -            this.isGlobal = isGlobal;
              this.isIncremental = isIncremental;
 +            this.isGlobal = isGlobal;
          }
  
-         public void addSSTables(UUID cfId, Set<SSTableReader> sstables)
-         {
-             sstableMap.get(cfId).addAll(sstables);
-         }
- 
          @SuppressWarnings("resource")
-         public synchronized Refs<SSTableReader> getAndReferenceSSTables(UUID 
cfId)
+         public synchronized Refs<SSTableReader> 
getActiveRepairedSSTableRefs(UUID cfId)
          {
-             Set<SSTableReader> sstables = sstableMap.get(cfId);
-             Iterator<SSTableReader> sstableIterator = sstables.iterator();
              ImmutableMap.Builder<SSTableReader, Ref<SSTableReader>> 
references = ImmutableMap.builder();
-             while (sstableIterator.hasNext())
+             for (SSTableReader sstable : getActiveSSTables(cfId))
              {
-                 SSTableReader sstable = sstableIterator.next();
-                 if (!new 
File(sstable.descriptor.filenameFor(Component.DATA)).exists())
-                 {
-                     sstableIterator.remove();
-                 }
+                 Ref<SSTableReader> ref = sstable.tryRef();
+                 if (ref == null)
+                     sstableMap.get(cfId).remove(sstable.getFilename());
                  else
+                     references.put(sstable, ref);
+             }
+             return new Refs<>(references.build());
+         }
+ 
+         private Set<SSTableReader> getActiveSSTables(UUID cfId)
+         {
+             Set<String> repairedSSTables = sstableMap.get(cfId);
+             Set<SSTableReader> activeSSTables = new HashSet<>();
+             Set<String> activeSSTableNames = new HashSet<>();
+             for (SSTableReader sstable : 
columnFamilyStores.get(cfId).getSSTables())
+             {
+                 if (repairedSSTables.contains(sstable.getFilename()))
                  {
-                     Ref<SSTableReader> ref = sstable.tryRef();
-                     if (ref == null)
-                         sstableIterator.remove();
-                     else
-                         references.put(sstable, ref);
+                     activeSSTables.add(sstable);
+                     activeSSTableNames.add(sstable.getFilename());
                  }
              }
-             return new Refs<>(references.build());
+             sstableMap.put(cfId, activeSSTableNames);
+             return activeSSTables;
          }
+ 
+         public void addSSTables(UUID cfId, Collection<SSTableReader> sstables)
+         {
+             for (SSTableReader sstable : sstables)
+             {
+                 sstableMap.get(cfId).add(sstable.getFilename());
+             }
+         }
+ 
          public long getRepairedAt()
          {
              if (isGlobal)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d572ab0a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
index a61a33e,b4066d7..bd761db
--- a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
+++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
@@@ -27,18 -28,27 +27,23 @@@ import org.junit.BeforeClass
  import org.junit.Test;
  
  import org.apache.cassandra.SchemaLoader;
 -import org.apache.cassandra.Util;
  import org.apache.cassandra.config.DatabaseDescriptor;
 -import org.apache.cassandra.config.KSMetaData;
  import org.apache.cassandra.db.ColumnFamilyStore;
 -import org.apache.cassandra.db.DecoratedKey;
  import org.apache.cassandra.db.Keyspace;
 -import org.apache.cassandra.db.Mutation;
++import org.apache.cassandra.db.RowUpdateBuilder;
+ import org.apache.cassandra.db.compaction.OperationType;
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
  import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.apache.cassandra.locator.AbstractReplicationStrategy;
 -import org.apache.cassandra.locator.SimpleStrategy;
  import org.apache.cassandra.locator.TokenMetadata;
 -import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.schema.KeyspaceParams;
  import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.concurrent.Refs;
  
  import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertFalse;
  
  public class ActiveRepairServiceTest
  {
@@@ -57,9 -67,10 +62,9 @@@
      {
          SchemaLoader.prepareServer();
          SchemaLoader.createKeyspace(KEYSPACE5,
 -                                    SimpleStrategy.class,
 -                                    KSMetaData.optsWithRF(2),
 +                                    KeyspaceParams.simple(2),
                                      SchemaLoader.standardCFMD(KEYSPACE5, 
CF_COUNTER),
-                                     SchemaLoader.standardCFMD(KEYSPACE5, 
CF_STANDRAD1));
+                                     SchemaLoader.standardCFMD(KEYSPACE5, 
CF_STANDARD1));
      }
  
      @Before
@@@ -213,4 -224,66 +218,61 @@@
          }
          return endpoints;
      }
+ 
+     @Test
+     public void testGetActiveRepairedSSTableRefs()
+     {
+         ColumnFamilyStore store = prepareColumnFamilyStore();
 -        Set<SSTableReader> original = store.getUnrepairedSSTables();
++        Set<SSTableReader> original = store.getLiveSSTables();
+ 
+         UUID prsId = UUID.randomUUID();
 -        ActiveRepairService.instance.registerParentRepairSession(prsId, 
Collections.singletonList(store), null, true, false);
++        ActiveRepairService.instance.registerParentRepairSession(prsId, 
Collections.singletonList(store), null, true, 0, false);
+         ActiveRepairService.ParentRepairSession prs = 
ActiveRepairService.instance.getParentRepairSession(prsId);
+ 
+         //add all sstables to parent repair session
+         prs.addSSTables(store.metadata.cfId, original);
+ 
+         //retrieve all sstable references from parent repair sessions
+         Refs<SSTableReader> refs = 
prs.getActiveRepairedSSTableRefs(store.metadata.cfId);
+         Set<SSTableReader> retrieved = Sets.newHashSet(refs.iterator());
+         assertEquals(original, retrieved);
+         refs.release();
+ 
+         //remove 1 sstable from data data tracker
+         Set<SSTableReader> newLiveSet = new HashSet<>(original);
+         Iterator<SSTableReader> it = newLiveSet.iterator();
+         final SSTableReader removed = it.next();
+         it.remove();
 -        store.getTracker().dropSSTables(new Predicate<SSTableReader>()
++        store.getTracker().dropSSTables(new 
com.google.common.base.Predicate<SSTableReader>()
+         {
+             public boolean apply(SSTableReader reader)
+             {
+                 return removed.equals(reader);
+             }
+         }, OperationType.COMPACTION, null);
+ 
+         //retrieve sstable references from parent repair session again - 
removed sstable must not be present
+         refs = prs.getActiveRepairedSSTableRefs(store.metadata.cfId);
+         retrieved = Sets.newHashSet(refs.iterator());
+         assertEquals(newLiveSet, retrieved);
+         assertFalse(retrieved.contains(removed));
+         refs.release();
+     }
+ 
+     private ColumnFamilyStore prepareColumnFamilyStore()
+     {
+         Keyspace keyspace = Keyspace.open(KEYSPACE5);
+         ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF_STANDARD1);
+         store.disableAutoCompaction();
 -        long timestamp = System.currentTimeMillis();
 -        //create 10 sstables
+         for (int i = 0; i < 10; i++)
+         {
 -            DecoratedKey key = Util.dk(Integer.toString(i));
 -            Mutation rm = new Mutation(KEYSPACE5, key.getKey());
 -            for (int j = 0; j < 10; j++)
 -                rm.add("Standard1", Util.cellname(Integer.toString(j)),
 -                       ByteBufferUtil.EMPTY_BYTE_BUFFER,
 -                       timestamp,
 -                       0);
 -            rm.apply();
 -            store.forceBlockingFlush();
++            new RowUpdateBuilder(store.metadata, System.currentTimeMillis(), 
Integer.toString(i))
++            .clustering("c")
++            .add("val", "val")
++            .build()
++            .applyUnsafe();
+         }
++        store.forceBlockingFlush();
+         return store;
+     }
  }

Reply via email to