Merge branch 'cassandra-2.0' into cassandra-2.1

Conflicts:
        src/java/org/apache/cassandra/io/sstable/SSTable.java
        
test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/306de09a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/306de09a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/306de09a

Branch: refs/heads/cassandra-2.1
Commit: 306de09af024a516548eb1d2019a5bcff5327f59
Parents: ca07614 1fab099
Author: Marcus Eriksson <[email protected]>
Authored: Wed Apr 15 12:11:11 2015 +0200
Committer: Marcus Eriksson <[email protected]>
Committed: Wed Apr 15 12:11:11 2015 +0200

----------------------------------------------------------------------
 .../cassandra/io/sstable/SSTableReader.java      | 11 +++++++++--
 .../LeveledCompactionStrategyTest.java           | 19 +++++++++++++------
 2 files changed, 22 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/306de09a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index c73d4a1,15808e8..43873a0
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@@ -600,37 -369,24 +600,44 @@@ public class SSTableReader extends SSTa
          this.dfile = dfile;
          this.indexSummary = indexSummary;
          this.bf = bloomFilter;
 +        this.setup(false);
      }
  
 -    /**
 -     * Clean up all opened resources.
 -     *
 -     * @throws IOException
 -     */
 -    public void close() throws IOException
 +    public static long getTotalBytes(Iterable<SSTableReader> sstables)
 +    {
 +        long sum = 0;
 +        for (SSTableReader sstable : sstables)
-         {
 +            sum += sstable.onDiskLength();
-         }
++        return sum;
++    }
++
++    public static long getTotalUncompressedBytes(Iterable<SSTableReader> 
sstables)
+     {
 -        if (readMeterSyncFuture != null)
 -            readMeterSyncFuture.cancel(false);
++        long sum = 0;
++        for (SSTableReader sstable : sstables)
++            sum += sstable.uncompressedLength();
+ 
 -        // Force finalizing mmapping if necessary
 -        ifile.cleanup();
 -        dfile.cleanup();
 -        // close the BF so it can be opened later.
 -        bf.close();
 -        indexSummary.close();
 +        return sum;
 +    }
 +
 +    public boolean equals(Object that)
 +    {
 +        return that instanceof SSTableReader && ((SSTableReader) 
that).descriptor.equals(this.descriptor);
 +    }
 +
 +    public int hashCode()
 +    {
 +        return this.descriptor.hashCode();
 +    }
 +
 +    public String getFilename()
 +    {
 +        return dfile.path;
 +    }
 +
 +    public String getIndexFilename()
 +    {
 +        return ifile.path;
      }
  
      public void setTrackedBy(DataTracker tracker)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/306de09a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --cc 
test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index 4c2236b,33de6f5..da54eee
--- 
a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ 
b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@@ -20,11 -20,13 +20,12 @@@ package org.apache.cassandra.db.compact
  import java.nio.ByteBuffer;
  import java.util.Arrays;
  import java.util.Collection;
 -import java.util.HashSet;
  import java.util.List;
+ import java.util.Random;
 -import java.util.Set;
 -import java.util.concurrent.ExecutionException;
  import java.util.UUID;
  
 +import org.junit.After;
 +import org.junit.Before;
  import org.junit.Test;
  import org.junit.runner.RunWith;
  
@@@ -76,7 -60,13 +77,9 @@@ public class LeveledCompactionStrategyT
      @Test
      public void testValidationMultipleSSTablePerLevel() throws Exception
      {
-         ByteBuffer value = ByteBuffer.wrap(new byte[100 * 1024]); // 100 KB 
value, make it easy to have multiple files
 -        String ksname = "Keyspace1";
 -        String cfname = "StandardLeveled";
 -        Keyspace keyspace = Keyspace.open(ksname);
 -        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
+         byte [] b = new byte[100 * 1024];
+         new Random().nextBytes(b);
+         ByteBuffer value = ByteBuffer.wrap(b); // 100 KB value, make it easy 
to have multiple files
  
          // Enough data to have a level 1 and 2
          int rows = 20;
@@@ -96,16 -86,14 +99,16 @@@
          }
  
          waitForLeveling(cfs);
 -        LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) 
cfs.getCompactionStrategy();
 +        WrappingCompactionStrategy strategy = (WrappingCompactionStrategy) 
cfs.getCompactionStrategy();
          // Checking we're not completely bad at math
-         assert strategy.getSSTableCountPerLevel()[1] > 0;
-         assert strategy.getSSTableCountPerLevel()[2] > 0;
 -        assertTrue(strategy.getLevelSize(1) > 0);
 -        assertTrue(strategy.getLevelSize(2) > 0);
++        assertTrue(strategy.getSSTableCountPerLevel()[1] > 0);
++        assertTrue(strategy.getSSTableCountPerLevel()[2] > 0);
  
 -        Range<Token> range = new Range<Token>(Util.token(""), Util.token(""));
 +        Range<Token> range = new Range<>(Util.token(""), Util.token(""));
          int gcBefore = 
keyspace.getColumnFamilyStore(cfname).gcBefore(System.currentTimeMillis());
 -        RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), ksname, 
cfname, range);
 +        UUID parentRepSession = UUID.randomUUID();
 +        
ActiveRepairService.instance.registerParentRepairSession(parentRepSession, 
Arrays.asList(cfs), Arrays.asList(range));
 +        RepairJobDesc desc = new RepairJobDesc(parentRepSession, 
UUID.randomUUID(), ksname, cfname, range);
          Validator validator = new Validator(desc, 
FBUtilities.getBroadcastAddress(), gcBefore);
          CompactionManager.instance.submitValidation(cfs, validator).get();
      }
@@@ -124,8 -112,15 +127,10 @@@
      @Test
      public void testCompactionProgress() throws Exception
      {
 -        String ksname = "Keyspace1";
 -        String cfname = "StandardLeveled";
 -        Keyspace keyspace = Keyspace.open(ksname);
 -        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
 -
          // make sure we have SSTables in L1
-         ByteBuffer value = ByteBuffer.wrap(new byte[100 * 1024]);
+         byte [] b = new byte[100 * 1024];
+         new Random().nextBytes(b);
+         ByteBuffer value = ByteBuffer.wrap(b);
          int rows = 2;
          int columns = 10;
          for (int r = 0; r < rows; r++)
@@@ -154,7 -149,7 +159,7 @@@
              scanner.next();
  
          // scanner.getCurrentPosition should be equal to total bytes of L1 
sstables
-         assert scanner.getCurrentPosition() == 
SSTableReader.getTotalBytes(sstables);
 -        assertEquals(scanner.getCurrentPosition(), 
SSTable.getTotalUncompressedBytes(sstables));
++        assertEquals(scanner.getCurrentPosition(), 
SSTableReader.getTotalUncompressedBytes(sstables));
      }
  
      @Test
@@@ -202,73 -202,4 +207,75 @@@
          // verify that the manifest has correct amount of sstables
          assertEquals(cfs.getSSTables().size(), levels[6]);
      }
 +
 +    @Test
 +    public void testNewRepairedSSTable() throws Exception
 +    {
-         ByteBuffer value = ByteBuffer.wrap(new byte[100 * 1024]); // 100 KB 
value, make it easy to have multiple files
++        byte [] b = new byte[100 * 1024];
++        new Random().nextBytes(b);
++        ByteBuffer value = ByteBuffer.wrap(b); // 100 KB value, make it easy 
to have multiple files
 +
 +        // Enough data to have a level 1 and 2
 +        int rows = 20;
 +        int columns = 10;
 +
 +        // Adds enough data to trigger multiple sstable per level
 +        for (int r = 0; r < rows; r++)
 +        {
 +            DecoratedKey key = Util.dk(String.valueOf(r));
 +            Mutation rm = new Mutation(ksname, key.getKey());
 +            for (int c = 0; c < columns; c++)
 +            {
 +                rm.add(cfname, Util.cellname("column" + c), value, 0);
 +            }
 +            rm.apply();
 +            cfs.forceBlockingFlush();
 +        }
 +        waitForLeveling(cfs);
 +        cfs.disableAutoCompaction();
 +
 +        while(CompactionManager.instance.isCompacting(Arrays.asList(cfs)))
 +            Thread.sleep(100);
 +
 +        WrappingCompactionStrategy strategy = (WrappingCompactionStrategy) 
cfs.getCompactionStrategy();
 +        List<AbstractCompactionStrategy> strategies = 
strategy.getWrappedStrategies();
 +        LeveledCompactionStrategy repaired = (LeveledCompactionStrategy) 
strategies.get(0);
 +        LeveledCompactionStrategy unrepaired = (LeveledCompactionStrategy) 
strategies.get(1);
 +        assertEquals(0, repaired.manifest.getLevelCount() );
 +        assertEquals(2, unrepaired.manifest.getLevelCount());
 +        assertTrue(strategy.getSSTableCountPerLevel()[1] > 0);
 +        assertTrue(strategy.getSSTableCountPerLevel()[2] > 0);
 +
 +        for (SSTableReader sstable : cfs.getSSTables())
 +            assertFalse(sstable.isRepaired());
 +
 +        int sstableCount = 0;
 +        for (List<SSTableReader> level : unrepaired.manifest.generations)
 +            sstableCount += level.size();
 +        // we only have unrepaired sstables:
 +        assertEquals(sstableCount, cfs.getSSTables().size());
 +
 +        SSTableReader sstable1 = unrepaired.manifest.generations[2].get(0);
 +        SSTableReader sstable2 = unrepaired.manifest.generations[1].get(0);
 +
 +        
sstable1.descriptor.getMetadataSerializer().mutateRepairedAt(sstable1.descriptor,
 System.currentTimeMillis());
 +        sstable1.reloadSSTableMetadata();
 +        assertTrue(sstable1.isRepaired());
 +
 +        strategy.handleNotification(new 
SSTableRepairStatusChanged(Arrays.asList(sstable1)), this);
 +
 +        int repairedSSTableCount = 0;
 +        for (List<SSTableReader> level : repaired.manifest.generations)
 +            repairedSSTableCount += level.size();
 +        assertEquals(1, repairedSSTableCount);
 +        // make sure the repaired sstable ends up in the same level in the 
repaired manifest:
 +        assertTrue(repaired.manifest.generations[2].contains(sstable1));
 +        // and that it is gone from unrepaired
 +        assertFalse(unrepaired.manifest.generations[2].contains(sstable1));
 +
 +        unrepaired.removeSSTable(sstable2);
 +        strategy.handleNotification(new SSTableAddedNotification(sstable2), 
this);
 +        assertTrue(unrepaired.manifest.getLevel(1).contains(sstable2));
 +        assertFalse(repaired.manifest.getLevel(1).contains(sstable2));
 +    }
  }

Reply via email to