Author: jbellis
Date: Wed Feb 3 02:22:16 2010
New Revision: 905869
URL: http://svn.apache.org/viewvc?rev=905869&view=rev
Log:
r/m SSTR.openedFiles; clean up decorated key iteration
patch by jbellis; reviewed by stuhood for CASSANDRA-753
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableTracker.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableTest.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=905869&r1=905868&r2=905869&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Wed Feb 3 02:22:16 2010
@@ -190,7 +190,7 @@
double v = DatabaseDescriptor.getRowsCachedFraction(table,
columnFamilyName);
int cacheSize;
if (0 < v && v < 1)
- cacheSize = Math.max(1, (int)(v *
SSTableReader.estimatedKeys(columnFamilyName)));
+ cacheSize = Math.max(1, (int)(v * ssTables_.estimatedKeys()));
else
cacheSize = (int)v;
if (logger_.isDebugEnabled())
@@ -1164,7 +1164,7 @@
public static Iterable<ColumnFamilyStore> all()
{
- Iterable<ColumnFamilyStore>[] stores =
(Iterable<ColumnFamilyStore>[])new Object[0];
+ Iterable<ColumnFamilyStore>[] stores = new
Iterable[DatabaseDescriptor.getTables().size()];
int i = 0;
for (Table table : Table.all())
{
@@ -1173,6 +1173,18 @@
return Iterables.concat(stores);
}
+ public Iterable<SSTable.KeyPosition> allIndexPositions()
+ {
+ Collection<SSTableReader> sstables = getSSTables();
+ Iterable<SSTable.KeyPosition>[] positions = new
Iterable[sstables.size()];
+ int i = 0;
+ for (SSTableReader sstable: sstables)
+ {
+ positions[i++] = sstable.getIndexPositions();
+ }
+ return Iterables.concat(positions);
+ }
+
/**
* for testing. no effort is made to clear historical memtables.
*/
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java?rev=905869&r1=905868&r2=905869&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java Wed
Feb 3 02:22:16 2010
@@ -178,7 +178,7 @@
* in the data file. Binary search is performed on a list of these objects
* to lookup keys within the SSTable data file.
*/
- class KeyPosition implements Comparable<KeyPosition>
+ public class KeyPosition implements Comparable<KeyPosition>
{
public final DecoratedKey key;
public final long position;
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java?rev=905869&r1=905868&r2=905869&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
Wed Feb 3 02:22:16 2010
@@ -53,8 +53,6 @@
{
private static final Logger logger = Logger.getLogger(SSTableReader.class);
- private static final FileSSTableMap openedFiles = new FileSSTableMap();
-
// `finalizers` is required to keep the PhantomReferences alive after the
enclosing SSTR is itself
// unreferenced. otherwise they will never get enqueued.
private static final Set<Reference<SSTableReader>> finalizers = new
HashSet<Reference<SSTableReader>>();
@@ -96,11 +94,6 @@
return INDEX_INTERVAL;
}
- public static long getApproximateKeyCount()
- {
- return getApproximateKeyCount(openedFiles.values());
- }
-
public static long getApproximateKeyCount(Iterable<SSTableReader> sstables)
{
long count = 0;
@@ -116,60 +109,6 @@
return count;
}
- public static int estimatedKeys(String columnFamilyName)
- {
- int n = 0;
- for (SSTableReader sstable : openedFiles.values())
- {
- if (sstable.getColumnFamilyName().equals(columnFamilyName))
- n += sstable.getIndexPositions().size() * INDEX_INTERVAL;
- }
- return n;
- }
-
- /**
- * Get all indexed keys defined by the two predicates.
- * @param cfpred A Predicate defining matching column families.
- * @param dkpred A Predicate defining matching DecoratedKeys.
- */
- public static List<DecoratedKey>
getIndexedDecoratedKeysFor(Predicate<SSTable> cfpred, Predicate<DecoratedKey>
dkpred)
- {
- List<DecoratedKey> indexedKeys = new ArrayList<DecoratedKey>();
-
- for (SSTableReader sstable : openedFiles.values())
- {
- if (!cfpred.apply(sstable))
- continue;
- for (KeyPosition kp : sstable.getIndexPositions())
- {
- if (dkpred.apply(kp.key))
- {
- indexedKeys.add(kp.key);
- }
- }
- }
- Collections.sort(indexedKeys);
-
- return indexedKeys;
- }
-
- /**
- * Get all indexed keys in any SSTable for our primary range.
- */
- public static List<DecoratedKey> getIndexedDecoratedKeys()
- {
- final Range range = StorageService.instance.getLocalPrimaryRange();
-
- Predicate<SSTable> cfpred = Predicates.alwaysTrue();
- return getIndexedDecoratedKeysFor(cfpred, new Predicate<DecoratedKey>()
- {
- public boolean apply(DecoratedKey dk)
- {
- return range.contains(dk.token);
- }
- });
- }
-
public static SSTableReader open(String dataFileName) throws IOException
{
return open(dataFileName,
@@ -180,7 +119,6 @@
public static SSTableReader open(String dataFileName, IPartitioner
partitioner, double keysCacheFraction) throws IOException
{
assert partitioner != null;
- assert openedFiles.get(dataFileName) == null;
long start = System.currentTimeMillis();
SSTableReader sstable = new SSTableReader(dataFileName, partitioner);
@@ -260,7 +198,6 @@
this.bf = bloomFilter;
phantomReference = new SSTableDeletingReference(this, finalizerQueue);
finalizers.add(phantomReference);
- openedFiles.put(filename, this);
this.keyCache = keyCache;
}
@@ -512,7 +449,6 @@
{
if (logger.isDebugEnabled())
logger.debug("Marking " + path + " compacted");
- openedFiles.remove(path);
if (!new File(compactedFilename()).createNewFile())
{
throw new IOException("Unable to create compaction marker");
@@ -526,16 +462,6 @@
bf = BloomFilter.alwaysMatchingBloomFilter();
}
- static void reopenUnsafe() throws IOException // testing only
- {
- Collection<SSTableReader> sstables = new
ArrayList<SSTableReader>(openedFiles.values());
- openedFiles.clear();
- for (SSTableReader sstable : sstables)
- {
- SSTableReader.open(sstable.path, sstable.partitioner, 0.01);
- }
- }
-
public IPartitioner getPartitioner()
{
return partitioner;
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableTracker.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableTracker.java?rev=905869&r1=905868&r2=905869&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableTracker.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableTracker.java
Wed Feb 3 02:22:16 2010
@@ -92,4 +92,14 @@
{
sstables = Collections.emptySet();
}
+
+ public long estimatedKeys()
+ {
+ long n = 0;
+ for (SSTableReader sstable : this)
+ {
+ n += sstable.getIndexPositions().size() *
SSTableReader.INDEX_INTERVAL;
+ }
+ return n;
+ }
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=905869&r1=905868&r2=905869&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
Wed Feb 3 02:22:16 2010
@@ -320,7 +320,7 @@
*/
public static class Validator implements IValidator, Callable<Object>
{
- public final CFPair cf;
+ public final CFPair cf; // TODO keep a CFS reference as a field
instead of its string representation
public final MerkleTree tree;
// the minimum token sorts first, but falls into the last range
@@ -331,7 +331,6 @@
private transient MerkleTree.TreeRange range;
private transient MerkleTree.TreeRangeIterator ranges;
- public final static Predicate<DecoratedKey> DKPRED =
Predicates.alwaysTrue();
public final static MerkleTree.RowHash EMPTY_ROW = new
MerkleTree.RowHash(null, new byte[0]);
Validator(CFPair cf)
@@ -356,14 +355,21 @@
public void prepare()
{
- Predicate<SSTable> cfpred = new Predicate<SSTable>()
+ List<DecoratedKey> keys = new ArrayList<DecoratedKey>();
+ ColumnFamilyStore cfs;
+ try
{
- public boolean apply(SSTable ss)
- {
- return cf.left.equals(ss.getTableName()) &&
cf.right.equals(ss.getColumnFamilyName());
- }
- };
- List<DecoratedKey> keys =
SSTableReader.getIndexedDecoratedKeysFor(cfpred, DKPRED);
+ cfs = Table.open(cf.left).getColumnFamilyStore(cf.right);
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ if (cfs != null) // TODO test w/ valid CF definitions, this if{}
shouldn't be necessary
+ {
+ for (SSTable.KeyPosition info: cfs.allIndexPositions())
+ keys.add(info.key);
+ }
if (keys.isEmpty())
{
@@ -737,8 +743,7 @@
ObjectInputStream ois = new ObjectInputStream(dis);
try
{
- Validator v = new Validator(cf, (MerkleTree)ois.readObject());
- return v;
+ return new Validator(cf, (MerkleTree)ois.readObject());
}
catch(Exception e)
{
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=905869&r1=905868&r2=905869&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Wed Feb 3 02:22:16 2010
@@ -34,6 +34,7 @@
import org.apache.cassandra.db.*;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.gms.*;
+import org.apache.cassandra.io.SSTable;
import org.apache.cassandra.locator.*;
import org.apache.cassandra.net.*;
import org.apache.cassandra.service.AntiEntropyService.TreeRequestVerbHandler;
@@ -1211,8 +1212,18 @@
List<String> tokens = new ArrayList<String>();
tokens.add(range.left().toString());
- List<DecoratedKey> decoratedKeys =
SSTableReader.getIndexedDecoratedKeys();
- if (decoratedKeys.size() < splits)
+ List<DecoratedKey> keys = new ArrayList<DecoratedKey>();
+ for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
+ {
+ for (SSTable.KeyPosition info: cfs.allIndexPositions())
+ {
+ if (range.contains(info.key.token))
+ keys.add(info.key);
+ }
+ }
+ Collections.sort(keys);
+
+ if (keys.size() < splits)
{
// not enough keys to generate good splits -- generate random ones
instead
// (since this only happens when we don't have many keys, it
doesn't really matter that the splits are poor)
@@ -1225,8 +1236,8 @@
{
for (int i = 1; i < splits; i++)
{
- int index = i * (decoratedKeys.size() / splits);
- tokens.add(decoratedKeys.get(index).token.toString());
+ int index = i * (keys.size() / splits);
+ tokens.add(keys.get(index).token.toString());
}
}
Modified:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableTest.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableTest.java?rev=905869&r1=905868&r2=905869&view=diff
==============================================================================
---
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableTest.java
(original)
+++
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableTest.java
Wed Feb 3 02:22:16 2010
@@ -45,7 +45,7 @@
// verify
verifySingle(ssTable, bytes, key);
- SSTableReader.reopenUnsafe(); // force reloading the index
+ ssTable = SSTableReader.open(ssTable.path); // read the index from disk
verifySingle(ssTable, bytes, key);
}
@@ -73,7 +73,7 @@
// verify
verifyMany(ssTable, map);
- SSTableReader.reopenUnsafe(); // force reloading the index
+ ssTable = SSTableReader.open(ssTable.path); // read the index from disk
verifyMany(ssTable, map);
}
@@ -92,34 +92,4 @@
assert Arrays.equals(bytes2, map.get(key));
}
}
-
- @Test
- public void testGetIndexedDecoratedKeysFor() throws IOException {
- final String ssname = "indexedkeys";
-
- int numkeys = 1000;
- TreeMap<String, byte[]> map = new TreeMap<String,byte[]>();
- for ( int i = 0; i < numkeys; i++ )
- {
- map.put(Integer.toString(i), "blah".getBytes());
- }
-
- // write
- SSTableReader ssTable = SSTableUtils.writeRawSSTable("table", ssname,
map);
-
- // verify
- Predicate<SSTable> cfpred;
- Predicate<DecoratedKey> dkpred;
-
- cfpred = new Predicate<SSTable>() {
- public boolean apply(SSTable ss)
- {
- return ss.getColumnFamilyName().equals(ssname);
- }
- };
- dkpred = Predicates.alwaysTrue();
- int actual = SSTableReader.getIndexedDecoratedKeysFor(cfpred,
dkpred).size();
- assert 0 < actual;
- assert actual <=
Math.ceil((double)numkeys/SSTableReader.indexInterval());
- }
}