Author: jbellis
Date: Wed Sep 22 03:42:00 2010
New Revision: 999743
URL: http://svn.apache.org/viewvc?rev=999743&view=rev
Log:
remove IKeyIterator and move ICompactionInfo implementation into
Table.IndexBuilder
patch by jbellis; reviewed by gdusbabek for CASSANDRA-1415
Removed:
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IKeyIterator.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=999743&r1=999742&r2=999743&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed
Sep 22 03:42:00 2010
@@ -221,7 +221,8 @@ public class ColumnFamilyStore implement
public void buildSecondaryIndexes(Collection<SSTableReader> sstables,
SortedSet<byte[]> columns)
{
logger.debug("Submitting index build to compactionmanager");
- Future future = CompactionManager.instance.submitIndexBuild(this,
columns, new ReducingKeyIterator(sstables));
+ Table.IndexBuilder builder =
Table.open(table).createIndexBuilder(this, columns, new
ReducingKeyIterator(sstables));
+ Future future = CompactionManager.instance.submitIndexBuild(this,
builder);
try
{
future.get();
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=999743&r1=999742&r2=999743&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Wed
Sep 22 03:42:00 2010
@@ -495,14 +495,14 @@ public class CompactionManager implement
return tablePairs;
}
- public Future submitIndexBuild(final ColumnFamilyStore cfs, final
SortedSet<byte[]> columns, final IKeyIterator iter)
+ public Future submitIndexBuild(final ColumnFamilyStore cfs, final
Table.IndexBuilder builder)
{
Runnable runnable = new Runnable()
{
public void run()
{
- executor.beginCompaction(cfs, iter);
- Table.open(cfs.table).rebuildIndex(cfs, columns, iter);
+ executor.beginCompaction(cfs, builder);
+ builder.build();
}
};
return executor.submit(runnable);
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=999743&r1=999742&r2=999743&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Wed Sep 22
03:42:00 2010
@@ -33,7 +33,8 @@ import org.apache.cassandra.config.*;
import org.apache.cassandra.db.clock.AbstractReconciler;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.dht.LocalToken;
-import org.apache.cassandra.io.sstable.IKeyIterator;
+import org.apache.cassandra.io.ICompactionInfo;
+import org.apache.cassandra.io.sstable.ReducingKeyIterator;
import org.apache.cassandra.io.sstable.SSTableDeletingReference;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.util.FileUtils;
@@ -443,39 +444,73 @@ public class Table
}
}
- public void rebuildIndex(ColumnFamilyStore cfs, SortedSet<byte[]> columns,
IKeyIterator iter)
+ public IndexBuilder createIndexBuilder(ColumnFamilyStore cfs,
SortedSet<byte[]> columns, ReducingKeyIterator iter)
{
- while (iter.hasNext())
+ return new IndexBuilder(cfs, columns, iter);
+ }
+
+ public class IndexBuilder implements ICompactionInfo
+ {
+ private final ColumnFamilyStore cfs;
+ private final SortedSet<byte[]> columns;
+ private final ReducingKeyIterator iter;
+
+ public IndexBuilder(ColumnFamilyStore cfs, SortedSet<byte[]> columns,
ReducingKeyIterator iter)
{
- DecoratedKey key = iter.next();
- logger.debug("Indexing row {} ", key);
- HashMap<ColumnFamilyStore,Memtable> memtablesToFlush = new
HashMap<ColumnFamilyStore, Memtable>(2);
- flusherLock.readLock().lock();
- try
+ this.cfs = cfs;
+ this.columns = columns;
+ this.iter = iter;
+ }
+
+ public void build()
+ {
+ while (iter.hasNext())
{
- synchronized (indexLockFor(key.key))
+ DecoratedKey key = iter.next();
+ logger.debug("Indexing row {} ", key);
+ HashMap<ColumnFamilyStore,Memtable> memtablesToFlush = new
HashMap<ColumnFamilyStore, Memtable>(2);
+ flusherLock.readLock().lock();
+ try
{
- ColumnFamily cf = readCurrentIndexedColumns(key, cfs,
columns);
- if (cf != null)
- applyIndexUpdates(key.key, memtablesToFlush, cf, cfs,
cf.getColumnNames(), null);
+ synchronized (indexLockFor(key.key))
+ {
+ ColumnFamily cf = readCurrentIndexedColumns(key, cfs,
columns);
+ if (cf != null)
+ applyIndexUpdates(key.key, memtablesToFlush, cf,
cfs, cf.getColumnNames(), null);
+ }
}
+ finally
+ {
+ flusherLock.readLock().unlock();
+ }
+
+ for (Map.Entry<ColumnFamilyStore, Memtable> entry :
memtablesToFlush.entrySet())
+ entry.getKey().maybeSwitchMemtable(entry.getValue(),
false, null);
+ }
+
+ try
+ {
+ iter.close();
}
- finally
+ catch (IOException e)
{
- flusherLock.readLock().unlock();
+ throw new RuntimeException(e);
}
+ }
- for (Map.Entry<ColumnFamilyStore, Memtable> entry :
memtablesToFlush.entrySet())
- entry.getKey().maybeSwitchMemtable(entry.getValue(), false,
null);
+ public long getTotalBytes()
+ {
+ return iter.getTotalBytes();
}
- try
+ public long getBytesRead()
{
- iter.close();
+ return iter.getBytesRead();
}
- catch (IOException e)
+
+ public String getTaskType()
{
- throw new RuntimeException(e);
+ return "Secondary index build";
}
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java?rev=999743&r1=999742&r2=999743&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
Wed Sep 22 03:42:00 2010
@@ -1,7 +1,9 @@
package org.apache.cassandra.io.sstable;
+import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
+import java.util.Iterator;
import org.apache.commons.collections.iterators.CollatingIterator;
@@ -9,7 +11,7 @@ import org.apache.cassandra.db.Decorated
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.ReducingIterator;
-public class ReducingKeyIterator implements IKeyIterator
+public class ReducingKeyIterator implements Iterator<DecoratedKey>, Closeable
{
private final CollatingIterator ci;
private final ReducingIterator<DecoratedKey, DecoratedKey> iter;