Merge branch 'cassandra-2.0' into cassandra-2.1
Conflicts:
src/java/org/apache/cassandra/io/sstable/SSTable.java
test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/306de09a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/306de09a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/306de09a
Branch: refs/heads/trunk
Commit: 306de09af024a516548eb1d2019a5bcff5327f59
Parents: ca07614 1fab099
Author: Marcus Eriksson <[email protected]>
Authored: Wed Apr 15 12:11:11 2015 +0200
Committer: Marcus Eriksson <[email protected]>
Committed: Wed Apr 15 12:11:11 2015 +0200
----------------------------------------------------------------------
.../cassandra/io/sstable/SSTableReader.java | 11 +++++++++--
.../LeveledCompactionStrategyTest.java | 19 +++++++++++++------
2 files changed, 22 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/306de09a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index c73d4a1,15808e8..43873a0
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@@ -600,37 -369,24 +600,44 @@@ public class SSTableReader extends SSTa
this.dfile = dfile;
this.indexSummary = indexSummary;
this.bf = bloomFilter;
+ this.setup(false);
}
- /**
- * Clean up all opened resources.
- *
- * @throws IOException
- */
- public void close() throws IOException
+ public static long getTotalBytes(Iterable<SSTableReader> sstables)
+ {
+ long sum = 0;
+ for (SSTableReader sstable : sstables)
- {
+ sum += sstable.onDiskLength();
- }
++ return sum;
++ }
++
++ public static long getTotalUncompressedBytes(Iterable<SSTableReader>
sstables)
+ {
- if (readMeterSyncFuture != null)
- readMeterSyncFuture.cancel(false);
++ long sum = 0;
++ for (SSTableReader sstable : sstables)
++ sum += sstable.uncompressedLength();
+
- // Force finalizing mmapping if necessary
- ifile.cleanup();
- dfile.cleanup();
- // close the BF so it can be opened later.
- bf.close();
- indexSummary.close();
+ return sum;
+ }
+
+ public boolean equals(Object that)
+ {
+ return that instanceof SSTableReader && ((SSTableReader)
that).descriptor.equals(this.descriptor);
+ }
+
+ public int hashCode()
+ {
+ return this.descriptor.hashCode();
+ }
+
+ public String getFilename()
+ {
+ return dfile.path;
+ }
+
+ public String getIndexFilename()
+ {
+ return ifile.path;
}
public void setTrackedBy(DataTracker tracker)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/306de09a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --cc
test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index 4c2236b,33de6f5..da54eee
---
a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++
b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@@ -20,11 -20,13 +20,12 @@@ package org.apache.cassandra.db.compact
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
-import java.util.HashSet;
import java.util.List;
+ import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
import java.util.UUID;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@@ -76,7 -60,13 +77,9 @@@ public class LeveledCompactionStrategyT
@Test
public void testValidationMultipleSSTablePerLevel() throws Exception
{
- ByteBuffer value = ByteBuffer.wrap(new byte[100 * 1024]); // 100 KB
value, make it easy to have multiple files
- String ksname = "Keyspace1";
- String cfname = "StandardLeveled";
- Keyspace keyspace = Keyspace.open(ksname);
- ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
+ byte [] b = new byte[100 * 1024];
+ new Random().nextBytes(b);
+ ByteBuffer value = ByteBuffer.wrap(b); // 100 KB value, make it easy
to have multiple files
// Enough data to have a level 1 and 2
int rows = 20;
@@@ -96,16 -86,14 +99,16 @@@
}
waitForLeveling(cfs);
- LeveledCompactionStrategy strategy = (LeveledCompactionStrategy)
cfs.getCompactionStrategy();
+ WrappingCompactionStrategy strategy = (WrappingCompactionStrategy)
cfs.getCompactionStrategy();
// Checking we're not completely bad at math
- assert strategy.getSSTableCountPerLevel()[1] > 0;
- assert strategy.getSSTableCountPerLevel()[2] > 0;
- assertTrue(strategy.getLevelSize(1) > 0);
- assertTrue(strategy.getLevelSize(2) > 0);
++ assertTrue(strategy.getSSTableCountPerLevel()[1] > 0);
++ assertTrue(strategy.getSSTableCountPerLevel()[2] > 0);
- Range<Token> range = new Range<Token>(Util.token(""), Util.token(""));
+ Range<Token> range = new Range<>(Util.token(""), Util.token(""));
int gcBefore =
keyspace.getColumnFamilyStore(cfname).gcBefore(System.currentTimeMillis());
- RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), ksname,
cfname, range);
+ UUID parentRepSession = UUID.randomUUID();
+
ActiveRepairService.instance.registerParentRepairSession(parentRepSession,
Arrays.asList(cfs), Arrays.asList(range));
+ RepairJobDesc desc = new RepairJobDesc(parentRepSession,
UUID.randomUUID(), ksname, cfname, range);
Validator validator = new Validator(desc,
FBUtilities.getBroadcastAddress(), gcBefore);
CompactionManager.instance.submitValidation(cfs, validator).get();
}
@@@ -124,8 -112,15 +127,10 @@@
@Test
public void testCompactionProgress() throws Exception
{
- String ksname = "Keyspace1";
- String cfname = "StandardLeveled";
- Keyspace keyspace = Keyspace.open(ksname);
- ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
-
// make sure we have SSTables in L1
- ByteBuffer value = ByteBuffer.wrap(new byte[100 * 1024]);
+ byte [] b = new byte[100 * 1024];
+ new Random().nextBytes(b);
+ ByteBuffer value = ByteBuffer.wrap(b);
int rows = 2;
int columns = 10;
for (int r = 0; r < rows; r++)
@@@ -154,7 -149,7 +159,7 @@@
scanner.next();
// scanner.getCurrentPosition should be equal to total bytes of L1
sstables
- assert scanner.getCurrentPosition() ==
SSTableReader.getTotalBytes(sstables);
- assertEquals(scanner.getCurrentPosition(),
SSTable.getTotalUncompressedBytes(sstables));
++ assertEquals(scanner.getCurrentPosition(),
SSTableReader.getTotalUncompressedBytes(sstables));
}
@Test
@@@ -202,73 -202,4 +207,75 @@@
// verify that the manifest has correct amount of sstables
assertEquals(cfs.getSSTables().size(), levels[6]);
}
+
+ @Test
+ public void testNewRepairedSSTable() throws Exception
+ {
- ByteBuffer value = ByteBuffer.wrap(new byte[100 * 1024]); // 100 KB
value, make it easy to have multiple files
++ byte [] b = new byte[100 * 1024];
++ new Random().nextBytes(b);
++ ByteBuffer value = ByteBuffer.wrap(b); // 100 KB value, make it easy
to have multiple files
+
+ // Enough data to have a level 1 and 2
+ int rows = 20;
+ int columns = 10;
+
+ // Adds enough data to trigger multiple sstable per level
+ for (int r = 0; r < rows; r++)
+ {
+ DecoratedKey key = Util.dk(String.valueOf(r));
+ Mutation rm = new Mutation(ksname, key.getKey());
+ for (int c = 0; c < columns; c++)
+ {
+ rm.add(cfname, Util.cellname("column" + c), value, 0);
+ }
+ rm.apply();
+ cfs.forceBlockingFlush();
+ }
+ waitForLeveling(cfs);
+ cfs.disableAutoCompaction();
+
+ while(CompactionManager.instance.isCompacting(Arrays.asList(cfs)))
+ Thread.sleep(100);
+
+ WrappingCompactionStrategy strategy = (WrappingCompactionStrategy)
cfs.getCompactionStrategy();
+ List<AbstractCompactionStrategy> strategies =
strategy.getWrappedStrategies();
+ LeveledCompactionStrategy repaired = (LeveledCompactionStrategy)
strategies.get(0);
+ LeveledCompactionStrategy unrepaired = (LeveledCompactionStrategy)
strategies.get(1);
+ assertEquals(0, repaired.manifest.getLevelCount() );
+ assertEquals(2, unrepaired.manifest.getLevelCount());
+ assertTrue(strategy.getSSTableCountPerLevel()[1] > 0);
+ assertTrue(strategy.getSSTableCountPerLevel()[2] > 0);
+
+ for (SSTableReader sstable : cfs.getSSTables())
+ assertFalse(sstable.isRepaired());
+
+ int sstableCount = 0;
+ for (List<SSTableReader> level : unrepaired.manifest.generations)
+ sstableCount += level.size();
+ // we only have unrepaired sstables:
+ assertEquals(sstableCount, cfs.getSSTables().size());
+
+ SSTableReader sstable1 = unrepaired.manifest.generations[2].get(0);
+ SSTableReader sstable2 = unrepaired.manifest.generations[1].get(0);
+
+
sstable1.descriptor.getMetadataSerializer().mutateRepairedAt(sstable1.descriptor,
System.currentTimeMillis());
+ sstable1.reloadSSTableMetadata();
+ assertTrue(sstable1.isRepaired());
+
+ strategy.handleNotification(new
SSTableRepairStatusChanged(Arrays.asList(sstable1)), this);
+
+ int repairedSSTableCount = 0;
+ for (List<SSTableReader> level : repaired.manifest.generations)
+ repairedSSTableCount += level.size();
+ assertEquals(1, repairedSSTableCount);
+ // make sure the repaired sstable ends up in the same level in the
repaired manifest:
+ assertTrue(repaired.manifest.generations[2].contains(sstable1));
+ // and that it is gone from unrepaired
+ assertFalse(unrepaired.manifest.generations[2].contains(sstable1));
+
+ unrepaired.removeSSTable(sstable2);
+ strategy.handleNotification(new SSTableAddedNotification(sstable2),
this);
+ assertTrue(unrepaired.manifest.getLevel(1).contains(sstable2));
+ assertFalse(repaired.manifest.getLevel(1).contains(sstable2));
+ }
}