Author: jbellis
Date: Wed Sep 22 03:41:38 2010
New Revision: 999741
URL: http://svn.apache.org/viewvc?rev=999741&view=rev
Log:
allow addIndex to create indexes that did not previously exist
patch by jbellis; reviewed by gdusbabek for CASSANDRA-1415
Added:
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IKeyIterator.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
cassandra/trunk/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
cassandra/trunk/test/conf/cassandra.yaml
cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=999741&r1=999740&r2=999741&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java Wed
Sep 22 03:41:38 2010
@@ -67,6 +67,7 @@ public final class CFMetaData
public static final CFMetaData HintsCf =
newSystemTable(HintedHandOffManager.HINTS_CF, 1, "hinted handoff data",
BytesType.instance, BytesType.instance);
public static final CFMetaData MigrationsCf =
newSystemTable(Migration.MIGRATIONS_CF, 2, "individual schema mutations",
TimeUUIDType.instance, null);
public static final CFMetaData SchemaCf =
newSystemTable(Migration.SCHEMA_CF, 3, "current state of the schema",
UTF8Type.instance, null);
+ public static final CFMetaData IndexCf =
newSystemTable(SystemTable.INDEX_CF, 5, "indexes that have been completed",
UTF8Type.instance, null);
private static CFMetaData newSystemTable(String cfName, int cfId, String
comment, AbstractType comparator, AbstractType subComparator)
{
Modified:
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=999741&r1=999740&r2=999741&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Wed Sep 22 03:41:38 2010
@@ -350,15 +350,16 @@ public class DatabaseDescriptor
LocalStrategy.class,
null,
1,
- new
CFMetaData[]{CFMetaData.StatusCf,
-
CFMetaData.HintsCf,
-
CFMetaData.MigrationsCf,
-
CFMetaData.SchemaCf,
- });
+ CFMetaData.StatusCf,
+ CFMetaData.HintsCf,
+ CFMetaData.MigrationsCf,
+ CFMetaData.SchemaCf,
+ CFMetaData.IndexCf);
CFMetaData.map(CFMetaData.StatusCf);
CFMetaData.map(CFMetaData.HintsCf);
CFMetaData.map(CFMetaData.MigrationsCf);
CFMetaData.map(CFMetaData.SchemaCf);
+ CFMetaData.map(CFMetaData.IndexCf);
tables.put(Table.SYSTEM_TABLE, systemMeta);
/* Load the seeds for node contact points */
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=999741&r1=999740&r2=999741&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed
Sep 22 03:41:38 2010
@@ -27,6 +27,8 @@ import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
import com.google.common.collect.Iterables;
import org.apache.commons.collections.IteratorUtils;
@@ -41,7 +43,6 @@ import org.apache.cassandra.config.CFMet
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.IClock.ClockRelationship;
-import org.apache.cassandra.db.clock.TimestampReconciler;
import org.apache.cassandra.db.columniterator.IColumnIterator;
import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
import org.apache.cassandra.db.commitlog.CommitLog;
@@ -51,11 +52,7 @@ import org.apache.cassandra.db.marshal.A
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.marshal.LocalByPartionerType;
import org.apache.cassandra.dht.*;
-import org.apache.cassandra.io.sstable.Component;
-import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.SSTable;
-import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.sstable.SSTableTracker;
+import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.IndexClause;
@@ -66,9 +63,6 @@ import org.apache.cassandra.utils.Latenc
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.WrappedRunnable;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
private static Logger logger =
LoggerFactory.getLogger(ColumnFamilyStore.class);
@@ -176,7 +170,7 @@ public class ColumnFamilyStore implement
for (ColumnDefinition info : metadata.column_metadata.values())
{
if (info.index_type != null)
- addIndex(table, info);
+ addIndex(info);
}
// register the mbean
@@ -194,17 +188,35 @@ public class ColumnFamilyStore implement
}
}
- private void addIndex(String table, ColumnDefinition info)
+ public void addIndex(final ColumnDefinition info)
{
+ assert info.index_type != null;
IPartitioner rowPartitioner = StorageService.getPartitioner();
AbstractType columnComparator = (rowPartitioner instanceof
OrderPreservingPartitioner || rowPartitioner instanceof ByteOrderedPartitioner)
? BytesType.instance
: new
LocalByPartionerType(StorageService.getPartitioner());
- CFMetaData indexedCfMetadata = CFMetaData.newIndexMetadata(table,
columnFamily, info, columnComparator);
+ final CFMetaData indexedCfMetadata =
CFMetaData.newIndexMetadata(table, columnFamily, info, columnComparator);
ColumnFamilyStore indexedCfs =
ColumnFamilyStore.createColumnFamilyStore(table,
indexedCfMetadata.cfName,
new LocalPartitioner(metadata.column_metadata.get(info.name).validator),
indexedCfMetadata);
+ if (!SystemTable.isIndexBuilt(table, indexedCfMetadata.cfName))
+ {
+ logger.info("Creating index {}.{}", table,
indexedCfMetadata.cfName);
+ Runnable runnable = new WrappedRunnable()
+ {
+ public void runMayThrow() throws IOException,
ExecutionException, InterruptedException
+ {
+ logger.debug("Submitting index build to
compactionmanager");
+ ReducingKeyIterator iter = new
ReducingKeyIterator(getSSTables());
+ Future future =
CompactionManager.instance.submitIndexBuild(ColumnFamilyStore.this,
FBUtilities.getSingleColumnSet(info.name), iter);
+ future.get();
+ logger.info("Index {} complete", indexedCfMetadata.cfName);
+ SystemTable.setIndexBuilt(table, indexedCfMetadata.cfName);
+ }
+ };
+ forceFlush(runnable);
+ }
indexedColumns.put(info.name, indexedCfs);
}
@@ -397,7 +409,7 @@ public class ColumnFamilyStore implement
}
/** flush the given memtable and swap in a new one for its CFS, if it
hasn't been frozen already. threadsafe. */
- Future<?> maybeSwitchMemtable(Memtable oldMemtable, final boolean
writeCommitLog)
+ Future<?> maybeSwitchMemtable(Memtable oldMemtable, final boolean
writeCommitLog, final Runnable afterFlush)
{
/**
* If we can get the writelock, that means no new updates can come in
and
@@ -436,6 +448,8 @@ public class ColumnFamilyStore implement
// if we're not writing to the commit log, we are
replaying the log, so marking
// the log header with "you can discard anything
written before the context" is not valid
CommitLog.instance().discardCompletedSegments(metadata.cfId, ctx);
+ if (afterFlush != null)
+ afterFlush.run();
}
}
});
@@ -465,10 +479,15 @@ public class ColumnFamilyStore implement
public Future<?> forceFlush()
{
+ return forceFlush(null);
+ }
+
+ public Future<?> forceFlush(Runnable afterFlush)
+ {
if (memtable.isClean())
return null;
- return maybeSwitchMemtable(memtable, true);
+ return maybeSwitchMemtable(memtable, true, afterFlush);
}
public void forceBlockingFlush() throws ExecutionException,
InterruptedException
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=999741&r1=999740&r2=999741&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Wed
Sep 22 03:41:38 2010
@@ -495,14 +495,14 @@ public class CompactionManager implement
return tablePairs;
}
- public Future submitIndexBuild(final ColumnFamilyStore cfs, final
KeyIterator iter)
+ public Future submitIndexBuild(final ColumnFamilyStore cfs, final
SortedSet<byte[]> columns, final IKeyIterator iter)
{
Runnable runnable = new Runnable()
{
public void run()
{
executor.beginCompaction(cfs, iter);
- Table.open(cfs.table).rebuildIndex(cfs, iter);
+ Table.open(cfs.table).rebuildIndex(cfs, columns, iter);
}
};
return executor.submit(runnable);
@@ -528,7 +528,8 @@ public class CompactionManager implement
return
Range.isTokenInRanges(((SSTableIdentityIterator)row).getKey().token, ranges);
}
};
- CollatingIterator iter =
FBUtilities.<SSTableIdentityIterator>getCollatingIterator();
+ // TODO CollatingIterator iter =
FBUtilities.<SSTableIdentityIterator>getCollatingIterator();
+ CollatingIterator iter = FBUtilities.getCollatingIterator();
for (SSTableReader sstable : sstables)
{
SSTableScanner scanner = sstable.getScanner(FILE_BUFFER_SIZE);
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java?rev=999741&r1=999740&r2=999741&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java Wed Sep
22 03:41:38 2010
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.clock.TimestampReconciler;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.marshal.BytesType;
@@ -49,6 +50,7 @@ public class SystemTable
{
private static Logger logger = LoggerFactory.getLogger(SystemTable.class);
public static final String STATUS_CF = "LocationInfo"; // keep the old CF
string for backwards-compatibility
+ public static final String INDEX_CF = "IndexInfo";
private static final byte[] LOCATION_KEY = "L".getBytes(UTF_8);
private static final byte[] BOOTSTRAP_KEY = "Bootstrap".getBytes(UTF_8);
private static final byte[] COOKIE_KEY = "Cookies".getBytes(UTF_8);
@@ -336,6 +338,24 @@ public class SystemTable
}
}
+ public static boolean isIndexBuilt(String table, String indexName)
+ {
+ ColumnFamilyStore cfs =
Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(INDEX_CF);
+ QueryFilter filter =
QueryFilter.getNamesFilter(decorate(table.getBytes(UTF_8)),
+ new
QueryPath(INDEX_CF),
+
indexName.getBytes(UTF_8));
+ return cfs.getColumnFamily(filter) != null;
+ }
+
+ public static void setIndexBuilt(String table, String indexName) throws
IOException
+ {
+ ColumnFamily cf = ColumnFamily.create(Table.SYSTEM_TABLE, INDEX_CF);
+ cf.addColumn(new Column(indexName.getBytes(UTF_8),
ArrayUtils.EMPTY_BYTE_ARRAY, new TimestampClock(System.currentTimeMillis())));
+ RowMutation rm = new RowMutation(Table.SYSTEM_TABLE,
table.getBytes(UTF_8));
+ rm.add(cf);
+ rm.apply();
+ }
+
public static class StorageMetadata
{
private Token token;
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=999741&r1=999740&r2=999741&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Wed Sep 22
03:41:38 2010
@@ -33,7 +33,7 @@ import org.apache.cassandra.config.*;
import org.apache.cassandra.db.clock.AbstractReconciler;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.dht.LocalToken;
-import org.apache.cassandra.io.sstable.KeyIterator;
+import org.apache.cassandra.io.sstable.IKeyIterator;
import org.apache.cassandra.io.sstable.SSTableDeletingReference;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.util.FileUtils;
@@ -42,7 +42,6 @@ import org.apache.commons.lang.ArrayUtil
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.utils.FBUtilities;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
@@ -386,7 +385,7 @@ public class Table
// flush memtables that got filled up. usually mTF will be empty and
this will be a no-op
for (Map.Entry<ColumnFamilyStore, Memtable> entry :
memtablesToFlush.entrySet())
- entry.getKey().maybeSwitchMemtable(entry.getValue(),
writeCommitLog);
+ entry.getKey().maybeSwitchMemtable(entry.getValue(),
writeCommitLog, null);
}
private static void ignoreObsoleteMutations(ColumnFamily cf,
AbstractReconciler reconciler, SortedSet<byte[]> mutatedIndexedColumns,
ColumnFamily oldIndexedColumns)
@@ -444,18 +443,19 @@ public class Table
}
}
- public void rebuildIndex(ColumnFamilyStore cfs, KeyIterator iter)
+ public void rebuildIndex(ColumnFamilyStore cfs, SortedSet<byte[]> columns,
IKeyIterator iter)
{
while (iter.hasNext())
{
DecoratedKey key = iter.next();
+ logger.debug("Indexing row {} ", key);
HashMap<ColumnFamilyStore,Memtable> memtablesToFlush = new
HashMap<ColumnFamilyStore, Memtable>(2);
flusherLock.readLock().lock();
try
{
synchronized (indexLockFor(key.key))
{
- ColumnFamily cf = readCurrentIndexedColumns(key, cfs,
cfs.getIndexedColumns());
+ ColumnFamily cf = readCurrentIndexedColumns(key, cfs,
columns);
applyIndexUpdates(key.key, memtablesToFlush, cf, cfs,
cf.getColumnNames(), null);
}
}
@@ -465,7 +465,16 @@ public class Table
}
for (Map.Entry<ColumnFamilyStore, Memtable> entry :
memtablesToFlush.entrySet())
- entry.getKey().maybeSwitchMemtable(entry.getValue(), false);
+ entry.getKey().maybeSwitchMemtable(entry.getValue(), false,
null);
+ }
+
+ try
+ {
+ iter.close();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
}
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java?rev=999741&r1=999740&r2=999741&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
Wed Sep 22 03:41:38 2010
@@ -30,6 +30,7 @@ import org.apache.cassandra.io.sstable.S
import org.apache.cassandra.io.util.FileDataInput;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.utils.FBUtilities;
public class NamesQueryFilter implements IFilter
{
@@ -42,21 +43,7 @@ public class NamesQueryFilter implements
public NamesQueryFilter(byte[] column)
{
- this(getSingleColumnSet(column));
- }
-
- private static TreeSet<byte[]> getSingleColumnSet(byte[] column)
- {
- Comparator<byte[]> singleColumnComparator = new Comparator<byte[]>()
- {
- public int compare(byte[] o1, byte[] o2)
- {
- return Arrays.equals(o1, o2) ? 0 : -1;
- }
- };
- TreeSet<byte[]> set = new TreeSet<byte[]>(singleColumnComparator);
- set.add(column);
- return set;
+ this(FBUtilities.getSingleColumnSet(column));
}
public IColumnIterator getMemtableColumnIterator(ColumnFamily cf,
DecoratedKey key, AbstractType comparator)
Modified:
cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java?rev=999741&r1=999740&r2=999741&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
Wed Sep 22 03:41:38 2010
@@ -78,7 +78,8 @@ implements Closeable, ICompactionInfo
@SuppressWarnings("unchecked")
protected static CollatingIterator
getCollatingIterator(Iterable<SSTableReader> sstables) throws IOException
{
- CollatingIterator iter =
FBUtilities.<SSTableIdentityIterator>getCollatingIterator();
+ // TODO CollatingIterator iter =
FBUtilities.<SSTableIdentityIterator>getCollatingIterator();
+ CollatingIterator iter = FBUtilities.getCollatingIterator();
for (SSTableReader sstable : sstables)
{
iter.addIterator(sstable.getScanner(FILE_BUFFER_SIZE));
Added:
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IKeyIterator.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IKeyIterator.java?rev=999741&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IKeyIterator.java
(added)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IKeyIterator.java
Wed Sep 22 03:41:38 2010
@@ -0,0 +1,11 @@
+package org.apache.cassandra.io.sstable;
+
+import java.io.Closeable;
+import java.util.Iterator;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.io.ICompactionInfo;
+
+public interface IKeyIterator extends Iterator<DecoratedKey>, ICompactionInfo,
Closeable
+{
+}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java?rev=999741&r1=999740&r2=999741&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
Wed Sep 22 03:41:38 2010
@@ -13,15 +13,22 @@ import org.apache.cassandra.io.util.Buff
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
-public class KeyIterator extends AbstractIterator<DecoratedKey> implements
ICompactionInfo, Closeable
+public class KeyIterator extends AbstractIterator<DecoratedKey> implements
IKeyIterator
{
private final BufferedRandomAccessFile in;
private final Descriptor desc;
- public KeyIterator(Descriptor desc) throws IOException
+ public KeyIterator(Descriptor desc)
{
this.desc = desc;
- in = new BufferedRandomAccessFile(new
File(desc.filenameFor(SSTable.COMPONENT_INDEX)), "r");
+ try
+ {
+ in = new BufferedRandomAccessFile(new
File(desc.filenameFor(SSTable.COMPONENT_INDEX)), "r");
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
}
protected DecoratedKey computeNext()
Added:
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java?rev=999741&view=auto
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
(added)
+++
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
Wed Sep 22 03:41:38 2010
@@ -0,0 +1,83 @@
+package org.apache.cassandra.io.sstable;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.commons.collections.iterators.CollatingIterator;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.ReducingIterator;
+
+public class ReducingKeyIterator implements IKeyIterator
+{
+ private final CollatingIterator ci;
+ private final ReducingIterator<DecoratedKey, DecoratedKey> iter;
+
+ public ReducingKeyIterator(Collection<SSTableReader> sstables)
+ {
+ ci = FBUtilities.getCollatingIterator();
+ for (SSTableReader sstable : sstables)
+ {
+ ci.addIterator(new KeyIterator(sstable.desc));
+ }
+
+ iter = new ReducingIterator<DecoratedKey, DecoratedKey>(ci)
+ {
+ DecoratedKey reduced = null;
+
+ public void reduce(DecoratedKey current)
+ {
+ reduced = current;
+ }
+
+ protected DecoratedKey getReduced()
+ {
+ return reduced;
+ }
+ };
+ }
+
+ public void close() throws IOException
+ {
+ for (Object o : ci.getIterators())
+ {
+ ((KeyIterator) o).close();
+ }
+ }
+
+ public long getTotalBytes()
+ {
+ long m = 0;
+ for (Object o : ci.getIterators())
+ {
+ m += ((KeyIterator) o).getTotalBytes();
+ }
+ return m;
+ }
+
+ public long getBytesRead()
+ {
+ long m = 0;
+ for (Object o : ci.getIterators())
+ {
+ m += ((KeyIterator) o).getBytesRead();
+ }
+ return m;
+ }
+
+ public boolean hasNext()
+ {
+ return iter.hasNext();
+ }
+
+ public DecoratedKey next()
+ {
+ return (DecoratedKey) iter.next();
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=999741&r1=999740&r2=999741&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
Wed Sep 22 03:41:38 2010
@@ -276,7 +276,7 @@ public class SSTableWriter extends SSTab
if (!cfs.getIndexedColumns().isEmpty())
{
- Future future = CompactionManager.instance.submitIndexBuild(cfs,
new KeyIterator(desc));
+ Future future = CompactionManager.instance.submitIndexBuild(cfs,
cfs.getIndexedColumns(), new KeyIterator(desc));
try
{
future.get();
Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=999741&r1=999740&r2=999741&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java Wed
Sep 22 03:41:38 2010
@@ -338,6 +338,8 @@ public class FBUtilities
}
}
+ /*
+ TODO how to make this work w/ ReducingKeyIterator?
public static <T extends Comparable<T>> CollatingIterator
getCollatingIterator()
{
// CollatingIterator will happily NPE if you do not specify a
comparator explicitly
@@ -349,6 +351,18 @@ public class FBUtilities
}
});
}
+ */
+ public static CollatingIterator getCollatingIterator()
+ {
+ // CollatingIterator will happily NPE if you do not specify a
comparator explicitly
+ return new CollatingIterator(new Comparator()
+ {
+ public int compare(Object o1, Object o2)
+ {
+ return ((Comparable) o1).compareTo(o2);
+ }
+ });
+ }
public static void atomicSetMax(AtomicInteger atomic, int i)
{
@@ -656,4 +670,18 @@ public class FBUtilities
}
}
}
+
+ public static TreeSet<byte[]> getSingleColumnSet(byte[] column)
+ {
+ Comparator<byte[]> singleColumnComparator = new Comparator<byte[]>()
+ {
+ public int compare(byte[] o1, byte[] o2)
+ {
+ return Arrays.equals(o1, o2) ? 0 : -1;
+ }
+ };
+ TreeSet<byte[]> set = new TreeSet<byte[]>(singleColumnComparator);
+ set.add(column);
+ return set;
+ }
}
Modified: cassandra/trunk/test/conf/cassandra.yaml
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/conf/cassandra.yaml?rev=999741&r1=999740&r2=999741&view=diff
==============================================================================
--- cassandra/trunk/test/conf/cassandra.yaml (original)
+++ cassandra/trunk/test/conf/cassandra.yaml Wed Sep 22 03:41:38 2010
@@ -74,6 +74,12 @@ keyspaces:
validator_class: LongType
index_type: KEYS
+ - name: Indexed2
+ column_metadata:
+ - name: birthdate
+ validator_class: LongType
+ # index will be added dynamically
+
- name: Keyspace2
replica_placement_strategy: org.apache.cassandra.locator.SimpleStrategy
replication_factor: 1
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=999741&r1=999740&r2=999741&view=diff
==============================================================================
---
cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
(original)
+++
cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
Wed Sep 22 03:41:38 2010
@@ -31,16 +31,21 @@ import org.junit.Test;
import static junit.framework.Assert.assertEquals;
import org.apache.cassandra.CleanupHelper;
import org.apache.cassandra.Util;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.IndexClause;
import org.apache.cassandra.thrift.IndexExpression;
import org.apache.cassandra.thrift.IndexOperator;
+import org.apache.cassandra.thrift.IndexType;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.WrappedRunnable;
import java.net.InetAddress;
+import java.util.concurrent.TimeUnit;
+
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.CollatingOrderPreservingPartitioner;
@@ -253,6 +258,34 @@ public class ColumnFamilyStoreTest exten
assert Arrays.equals("k1".getBytes(), rows.get(0).key.key);
}
+ @Test
+ public void testIndexCreate() throws IOException, ConfigurationException,
InterruptedException
+ {
+ Table table = Table.open("Keyspace1");
+
+ // create a row and update the birthdate value, test that the index
query fetches the new version
+ RowMutation rm;
+ rm = new RowMutation("Keyspace1", "k1".getBytes());
+ rm.add(new QueryPath("Indexed2", null, "birthdate".getBytes("UTF8")),
FBUtilities.toByteArray(1L), new TimestampClock(1));
+ rm.apply();
+
+ ColumnFamilyStore cfs = table.getColumnFamilyStore("Indexed2");
+ ColumnDefinition old =
cfs.metadata.column_metadata.get("birthdate".getBytes("UTF8"));
+ ColumnDefinition cd = new ColumnDefinition(old.name,
old.validator.getClass().getName(), IndexType.KEYS, "birthdate_index");
+ cfs.addIndex(cd);
+ while (!SystemTable.isIndexBuilt("Keyspace1",
cfs.getIndexedColumnFamilyStore("birthdate".getBytes("UTF8")).columnFamily))
+ TimeUnit.MILLISECONDS.sleep(100);
+
+ IndexExpression expr = new
IndexExpression("birthdate".getBytes("UTF8"), IndexOperator.EQ,
FBUtilities.toByteArray(1L));
+ IndexClause clause = new IndexClause(Arrays.asList(expr),
ArrayUtils.EMPTY_BYTE_ARRAY, 100);
+ IFilter filter = new IdentityQueryFilter();
+ IPartitioner p = StorageService.getPartitioner();
+ Range range = new Range(p.getMinimumToken(), p.getMinimumToken());
+ List<Row> rows = table.getColumnFamilyStore("Indexed2").scan(clause,
range, filter);
+ assert rows.size() == 1 : StringUtils.join(rows, ",");
+ assert Arrays.equals("k1".getBytes(), rows.get(0).key.key);
+ }
+
private ColumnFamilyStore insertKey1Key2() throws IOException,
ExecutionException, InterruptedException
{
List<RowMutation> rms = new LinkedList<RowMutation>();