Author: jbellis
Date: Wed Sep 22 03:41:38 2010
New Revision: 999741

URL: http://svn.apache.org/viewvc?rev=999741&view=rev
Log:
allow addIndex to create indexes that did not previously exist
patch by jbellis; reviewed by gdusbabek for CASSANDRA-1415


Added:
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IKeyIterator.java
    
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
Modified:
    cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
    cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
    cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
    
cassandra/trunk/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
    cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
    cassandra/trunk/test/conf/cassandra.yaml
    cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=999741&r1=999740&r2=999741&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java Wed 
Sep 22 03:41:38 2010
@@ -67,6 +67,7 @@ public final class CFMetaData
     public static final CFMetaData HintsCf = 
newSystemTable(HintedHandOffManager.HINTS_CF, 1, "hinted handoff data", 
BytesType.instance, BytesType.instance);
     public static final CFMetaData MigrationsCf = 
newSystemTable(Migration.MIGRATIONS_CF, 2, "individual schema mutations", 
TimeUUIDType.instance, null);
     public static final CFMetaData SchemaCf = 
newSystemTable(Migration.SCHEMA_CF, 3, "current state of the schema", 
UTF8Type.instance, null);
+    public static final CFMetaData IndexCf = 
newSystemTable(SystemTable.INDEX_CF, 5, "indexes that have been completed", 
UTF8Type.instance, null);
 
     private static CFMetaData newSystemTable(String cfName, int cfId, String 
comment, AbstractType comparator, AbstractType subComparator)
     {

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=999741&r1=999740&r2=999741&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
Wed Sep 22 03:41:38 2010
@@ -350,15 +350,16 @@ public class DatabaseDescriptor
                                                    LocalStrategy.class,
                                                    null,
                                                    1,
-                                                   new 
CFMetaData[]{CFMetaData.StatusCf,
-                                                                    
CFMetaData.HintsCf,
-                                                                    
CFMetaData.MigrationsCf,
-                                                                    
CFMetaData.SchemaCf,
-            });
+                                                   CFMetaData.StatusCf,
+                                                   CFMetaData.HintsCf,
+                                                   CFMetaData.MigrationsCf,
+                                                   CFMetaData.SchemaCf,
+                                                   CFMetaData.IndexCf);
             CFMetaData.map(CFMetaData.StatusCf);
             CFMetaData.map(CFMetaData.HintsCf);
             CFMetaData.map(CFMetaData.MigrationsCf);
             CFMetaData.map(CFMetaData.SchemaCf);
+            CFMetaData.map(CFMetaData.IndexCf);
             tables.put(Table.SYSTEM_TABLE, systemMeta);
             
             /* Load the seeds for node contact points */

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=999741&r1=999740&r2=999741&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed 
Sep 22 03:41:38 2010
@@ -27,6 +27,8 @@ import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
 
 import com.google.common.collect.Iterables;
 import org.apache.commons.collections.IteratorUtils;
@@ -41,7 +43,6 @@ import org.apache.cassandra.config.CFMet
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.IClock.ClockRelationship;
-import org.apache.cassandra.db.clock.TimestampReconciler;
 import org.apache.cassandra.db.columniterator.IColumnIterator;
 import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
 import org.apache.cassandra.db.commitlog.CommitLog;
@@ -51,11 +52,7 @@ import org.apache.cassandra.db.marshal.A
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.marshal.LocalByPartionerType;
 import org.apache.cassandra.dht.*;
-import org.apache.cassandra.io.sstable.Component;
-import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.SSTable;
-import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.sstable.SSTableTracker;
+import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.IndexClause;
@@ -66,9 +63,6 @@ import org.apache.cassandra.utils.Latenc
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.WrappedRunnable;
 
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
 public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 {
     private static Logger logger = 
LoggerFactory.getLogger(ColumnFamilyStore.class);
@@ -176,7 +170,7 @@ public class ColumnFamilyStore implement
         for (ColumnDefinition info : metadata.column_metadata.values())
         {
             if (info.index_type != null)
-                addIndex(table, info);
+                addIndex(info);
         }
 
         // register the mbean
@@ -194,17 +188,35 @@ public class ColumnFamilyStore implement
         }
     }
 
-    private void addIndex(String table, ColumnDefinition info)
+    public void addIndex(final ColumnDefinition info)
     {
+        assert info.index_type != null;
         IPartitioner rowPartitioner = StorageService.getPartitioner();
         AbstractType columnComparator = (rowPartitioner instanceof 
OrderPreservingPartitioner || rowPartitioner instanceof ByteOrderedPartitioner)
                                         ? BytesType.instance
                                         : new 
LocalByPartionerType(StorageService.getPartitioner());
-        CFMetaData indexedCfMetadata = CFMetaData.newIndexMetadata(table, 
columnFamily, info, columnComparator);
+        final CFMetaData indexedCfMetadata = 
CFMetaData.newIndexMetadata(table, columnFamily, info, columnComparator);
         ColumnFamilyStore indexedCfs = 
ColumnFamilyStore.createColumnFamilyStore(table,
                                                                                
  indexedCfMetadata.cfName,
                                                                                
  new LocalPartitioner(metadata.column_metadata.get(info.name).validator),
                                                                                
  indexedCfMetadata);
+        if (!SystemTable.isIndexBuilt(table, indexedCfMetadata.cfName))
+        {
+            logger.info("Creating index {}.{}", table, 
indexedCfMetadata.cfName);
+            Runnable runnable = new WrappedRunnable()
+            {
+                public void runMayThrow() throws IOException, 
ExecutionException, InterruptedException
+                {
+                    logger.debug("Submitting index build to 
compactionmanager");
+                    ReducingKeyIterator iter = new 
ReducingKeyIterator(getSSTables());
+                    Future future = 
CompactionManager.instance.submitIndexBuild(ColumnFamilyStore.this, 
FBUtilities.getSingleColumnSet(info.name), iter);
+                    future.get();
+                    logger.info("Index {} complete", indexedCfMetadata.cfName);
+                    SystemTable.setIndexBuilt(table, indexedCfMetadata.cfName);
+                }
+            };
+            forceFlush(runnable);
+        }
         indexedColumns.put(info.name, indexedCfs);
     }
 
@@ -397,7 +409,7 @@ public class ColumnFamilyStore implement
     }
 
     /** flush the given memtable and swap in a new one for its CFS, if it 
hasn't been frozen already.  threadsafe. */
-    Future<?> maybeSwitchMemtable(Memtable oldMemtable, final boolean 
writeCommitLog)
+    Future<?> maybeSwitchMemtable(Memtable oldMemtable, final boolean 
writeCommitLog, final Runnable afterFlush)
     {
         /**
          *  If we can get the writelock, that means no new updates can come in 
and 
@@ -436,6 +448,8 @@ public class ColumnFamilyStore implement
                         // if we're not writing to the commit log, we are 
replaying the log, so marking
                         // the log header with "you can discard anything 
written before the context" is not valid
                         
CommitLog.instance().discardCompletedSegments(metadata.cfId, ctx);
+                        if (afterFlush != null)
+                            afterFlush.run();
                     }
                 }
             });
@@ -465,10 +479,15 @@ public class ColumnFamilyStore implement
 
     public Future<?> forceFlush()
     {
+        return forceFlush(null);
+    }
+
+    public Future<?> forceFlush(Runnable afterFlush)
+    {
         if (memtable.isClean())
             return null;
 
-        return maybeSwitchMemtable(memtable, true);
+        return maybeSwitchMemtable(memtable, true, afterFlush);
     }
 
     public void forceBlockingFlush() throws ExecutionException, 
InterruptedException

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=999741&r1=999740&r2=999741&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Wed 
Sep 22 03:41:38 2010
@@ -495,14 +495,14 @@ public class CompactionManager implement
         return tablePairs;
     }
 
-    public Future submitIndexBuild(final ColumnFamilyStore cfs, final 
KeyIterator iter)
+    public Future submitIndexBuild(final ColumnFamilyStore cfs, final 
SortedSet<byte[]> columns, final IKeyIterator iter)
     {
         Runnable runnable = new Runnable()
         {
             public void run()
             {
                 executor.beginCompaction(cfs, iter);
-                Table.open(cfs.table).rebuildIndex(cfs, iter);
+                Table.open(cfs.table).rebuildIndex(cfs, columns, iter);
             }
         };
         return executor.submit(runnable);
@@ -528,7 +528,8 @@ public class CompactionManager implement
                     return 
Range.isTokenInRanges(((SSTableIdentityIterator)row).getKey().token, ranges);
                 }
             };
-            CollatingIterator iter = 
FBUtilities.<SSTableIdentityIterator>getCollatingIterator();
+            // TODO CollatingIterator iter = 
FBUtilities.<SSTableIdentityIterator>getCollatingIterator();
+            CollatingIterator iter = FBUtilities.getCollatingIterator();
             for (SSTableReader sstable : sstables)
             {
                 SSTableScanner scanner = sstable.getScanner(FILE_BUFFER_SIZE);

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java?rev=999741&r1=999740&r2=999741&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java Wed Sep 
22 03:41:38 2010
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.clock.TimestampReconciler;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.marshal.BytesType;
@@ -49,6 +50,7 @@ public class SystemTable
 {
     private static Logger logger = LoggerFactory.getLogger(SystemTable.class);
     public static final String STATUS_CF = "LocationInfo"; // keep the old CF 
string for backwards-compatibility
+    public static final String INDEX_CF = "IndexInfo";
     private static final byte[] LOCATION_KEY = "L".getBytes(UTF_8);
     private static final byte[] BOOTSTRAP_KEY = "Bootstrap".getBytes(UTF_8);
     private static final byte[] COOKIE_KEY = "Cookies".getBytes(UTF_8);
@@ -336,6 +338,24 @@ public class SystemTable
         }
     }
 
+    public static boolean isIndexBuilt(String table, String indexName)
+    {
+        ColumnFamilyStore cfs = 
Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(INDEX_CF);
+        QueryFilter filter = 
QueryFilter.getNamesFilter(decorate(table.getBytes(UTF_8)),
+                                                        new 
QueryPath(INDEX_CF),
+                                                        
indexName.getBytes(UTF_8));
+        return cfs.getColumnFamily(filter) != null;
+    }
+
+    public static void setIndexBuilt(String table, String indexName) throws 
IOException
+    {
+        ColumnFamily cf = ColumnFamily.create(Table.SYSTEM_TABLE, INDEX_CF);
+        cf.addColumn(new Column(indexName.getBytes(UTF_8), 
ArrayUtils.EMPTY_BYTE_ARRAY, new TimestampClock(System.currentTimeMillis())));
+        RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, 
table.getBytes(UTF_8));
+        rm.add(cf);
+        rm.apply();
+    }
+
     public static class StorageMetadata
     {
         private Token token;

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=999741&r1=999740&r2=999741&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Wed Sep 22 
03:41:38 2010
@@ -33,7 +33,7 @@ import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.clock.AbstractReconciler;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.dht.LocalToken;
-import org.apache.cassandra.io.sstable.KeyIterator;
+import org.apache.cassandra.io.sstable.IKeyIterator;
 import org.apache.cassandra.io.sstable.SSTableDeletingReference;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.FileUtils;
@@ -42,7 +42,6 @@ import org.apache.commons.lang.ArrayUtil
 
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.thrift.ColumnParent;
 import org.apache.cassandra.utils.FBUtilities;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
@@ -386,7 +385,7 @@ public class Table
 
         // flush memtables that got filled up.  usually mTF will be empty and 
this will be a no-op
         for (Map.Entry<ColumnFamilyStore, Memtable> entry : 
memtablesToFlush.entrySet())
-            entry.getKey().maybeSwitchMemtable(entry.getValue(), 
writeCommitLog);
+            entry.getKey().maybeSwitchMemtable(entry.getValue(), 
writeCommitLog, null);
     }
 
     private static void ignoreObsoleteMutations(ColumnFamily cf, 
AbstractReconciler reconciler, SortedSet<byte[]> mutatedIndexedColumns, 
ColumnFamily oldIndexedColumns)
@@ -444,18 +443,19 @@ public class Table
         }
     }
 
-    public void rebuildIndex(ColumnFamilyStore cfs, KeyIterator iter)
+    public void rebuildIndex(ColumnFamilyStore cfs, SortedSet<byte[]> columns, 
IKeyIterator iter)
     {
         while (iter.hasNext())
         {
             DecoratedKey key = iter.next();
+            logger.debug("Indexing row {} ", key);
             HashMap<ColumnFamilyStore,Memtable> memtablesToFlush = new 
HashMap<ColumnFamilyStore, Memtable>(2);
             flusherLock.readLock().lock();
             try
             {
                 synchronized (indexLockFor(key.key))
                 {
-                    ColumnFamily cf = readCurrentIndexedColumns(key, cfs, 
cfs.getIndexedColumns());
+                    ColumnFamily cf = readCurrentIndexedColumns(key, cfs, 
columns);
                     applyIndexUpdates(key.key, memtablesToFlush, cf, cfs, 
cf.getColumnNames(), null);
                 }
             }
@@ -465,7 +465,16 @@ public class Table
             }
 
             for (Map.Entry<ColumnFamilyStore, Memtable> entry : 
memtablesToFlush.entrySet())
-                entry.getKey().maybeSwitchMemtable(entry.getValue(), false);
+                entry.getKey().maybeSwitchMemtable(entry.getValue(), false, 
null);
+        }
+
+        try
+        {
+            iter.close();
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
         }
     }
 

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java?rev=999741&r1=999740&r2=999741&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java 
Wed Sep 22 03:41:38 2010
@@ -30,6 +30,7 @@ import org.apache.cassandra.io.sstable.S
 import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.utils.FBUtilities;
 
 public class NamesQueryFilter implements IFilter
 {
@@ -42,21 +43,7 @@ public class NamesQueryFilter implements
 
     public NamesQueryFilter(byte[] column)
     {
-        this(getSingleColumnSet(column));
-    }
-
-    private static TreeSet<byte[]> getSingleColumnSet(byte[] column)
-    {
-        Comparator<byte[]> singleColumnComparator = new Comparator<byte[]>()
-        {
-            public int compare(byte[] o1, byte[] o2)
-            {
-                return Arrays.equals(o1, o2) ? 0 : -1;
-            }
-        };
-        TreeSet<byte[]> set = new TreeSet<byte[]>(singleColumnComparator);
-        set.add(column);
-        return set;
+        this(FBUtilities.getSingleColumnSet(column));
     }
 
     public IColumnIterator getMemtableColumnIterator(ColumnFamily cf, 
DecoratedKey key, AbstractType comparator)

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java?rev=999741&r1=999740&r2=999741&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java 
Wed Sep 22 03:41:38 2010
@@ -78,7 +78,8 @@ implements Closeable, ICompactionInfo
     @SuppressWarnings("unchecked")
     protected static CollatingIterator 
getCollatingIterator(Iterable<SSTableReader> sstables) throws IOException
     {
-        CollatingIterator iter = 
FBUtilities.<SSTableIdentityIterator>getCollatingIterator();
+        // TODO CollatingIterator iter = 
FBUtilities.<SSTableIdentityIterator>getCollatingIterator();
+        CollatingIterator iter = FBUtilities.getCollatingIterator();
         for (SSTableReader sstable : sstables)
         {
             iter.addIterator(sstable.getScanner(FILE_BUFFER_SIZE));

Added: 
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IKeyIterator.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IKeyIterator.java?rev=999741&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IKeyIterator.java 
(added)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IKeyIterator.java 
Wed Sep 22 03:41:38 2010
@@ -0,0 +1,11 @@
+package org.apache.cassandra.io.sstable;
+
+import java.io.Closeable;
+import java.util.Iterator;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.io.ICompactionInfo;
+
+public interface IKeyIterator extends Iterator<DecoratedKey>, ICompactionInfo, 
Closeable
+{
+}

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java?rev=999741&r1=999740&r2=999741&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java 
Wed Sep 22 03:41:38 2010
@@ -13,15 +13,22 @@ import org.apache.cassandra.io.util.Buff
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
-public class KeyIterator extends AbstractIterator<DecoratedKey> implements 
ICompactionInfo, Closeable
+public class KeyIterator extends AbstractIterator<DecoratedKey> implements 
IKeyIterator
 {
     private final BufferedRandomAccessFile in;
     private final Descriptor desc;
 
-    public KeyIterator(Descriptor desc) throws IOException
+    public KeyIterator(Descriptor desc)
     {
         this.desc = desc;
-        in = new BufferedRandomAccessFile(new 
File(desc.filenameFor(SSTable.COMPONENT_INDEX)), "r");
+        try
+        {
+            in = new BufferedRandomAccessFile(new 
File(desc.filenameFor(SSTable.COMPONENT_INDEX)), "r");
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
+        }
     }
 
     protected DecoratedKey computeNext()

Added: 
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java?rev=999741&view=auto
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
 (added)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
 Wed Sep 22 03:41:38 2010
@@ -0,0 +1,83 @@
+package org.apache.cassandra.io.sstable;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.commons.collections.iterators.CollatingIterator;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.ReducingIterator;
+
+public class ReducingKeyIterator implements IKeyIterator
+{
+    private final CollatingIterator ci;
+    private final ReducingIterator<DecoratedKey, DecoratedKey> iter;
+
+    public ReducingKeyIterator(Collection<SSTableReader> sstables)
+    {
+        ci = FBUtilities.getCollatingIterator();
+        for (SSTableReader sstable : sstables)
+        {
+            ci.addIterator(new KeyIterator(sstable.desc));
+        }
+
+        iter = new ReducingIterator<DecoratedKey, DecoratedKey>(ci)
+        {
+            DecoratedKey reduced = null;
+
+            public void reduce(DecoratedKey current)
+            {
+                reduced = current;
+            }
+
+            protected DecoratedKey getReduced()
+            {
+                return reduced;
+            }
+        };
+    }
+
+    public void close() throws IOException
+    {
+        for (Object o : ci.getIterators())
+        {
+            ((KeyIterator) o).close();
+        }
+    }
+
+    public long getTotalBytes()
+    {
+        long m = 0;
+        for (Object o : ci.getIterators())
+        {
+            m += ((KeyIterator) o).getTotalBytes();
+        }
+        return m;
+    }
+
+    public long getBytesRead()
+    {
+        long m = 0;
+        for (Object o : ci.getIterators())
+        {
+            m += ((KeyIterator) o).getBytesRead();
+        }
+        return m;
+    }
+
+    public boolean hasNext()
+    {
+        return iter.hasNext();
+    }
+
+    public DecoratedKey next()
+    {
+        return (DecoratedKey) iter.next();
+    }
+
+    public void remove()
+    {
+        throw new UnsupportedOperationException();
+    }
+}

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=999741&r1=999740&r2=999741&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java 
Wed Sep 22 03:41:38 2010
@@ -276,7 +276,7 @@ public class SSTableWriter extends SSTab
 
         if (!cfs.getIndexedColumns().isEmpty())
         {
-            Future future = CompactionManager.instance.submitIndexBuild(cfs, 
new KeyIterator(desc));
+            Future future = CompactionManager.instance.submitIndexBuild(cfs, 
cfs.getIndexedColumns(), new KeyIterator(desc));
             try
             {
                 future.get();

Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=999741&r1=999740&r2=999741&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java Wed 
Sep 22 03:41:38 2010
@@ -338,6 +338,8 @@ public class FBUtilities
         }
     }
 
+    /*
+    TODO how to make this work w/ ReducingKeyIterator?
     public static <T extends Comparable<T>> CollatingIterator 
getCollatingIterator()
     {
         // CollatingIterator will happily NPE if you do not specify a 
comparator explicitly
@@ -349,6 +351,18 @@ public class FBUtilities
             }
         });
     }
+     */
+    public static CollatingIterator getCollatingIterator()
+    {
+        // CollatingIterator will happily NPE if you do not specify a 
comparator explicitly
+        return new CollatingIterator(new Comparator()
+        {
+            public int compare(Object o1, Object o2)
+            {
+                return ((Comparable) o1).compareTo(o2);
+            }
+        });
+    }
 
     public static void atomicSetMax(AtomicInteger atomic, int i)
     {
@@ -656,4 +670,18 @@ public class FBUtilities
             }
         }
     }
+
+    public static TreeSet<byte[]> getSingleColumnSet(byte[] column)
+    {
+        Comparator<byte[]> singleColumnComparator = new Comparator<byte[]>()
+        {
+            public int compare(byte[] o1, byte[] o2)
+            {
+                return Arrays.equals(o1, o2) ? 0 : -1;
+            }
+        };
+        TreeSet<byte[]> set = new TreeSet<byte[]>(singleColumnComparator);
+        set.add(column);
+        return set;
+    }
 }

Modified: cassandra/trunk/test/conf/cassandra.yaml
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/conf/cassandra.yaml?rev=999741&r1=999740&r2=999741&view=diff
==============================================================================
--- cassandra/trunk/test/conf/cassandra.yaml (original)
+++ cassandra/trunk/test/conf/cassandra.yaml Wed Sep 22 03:41:38 2010
@@ -74,6 +74,12 @@ keyspaces:
               validator_class: LongType
               index_type: KEYS
 
+        - name: Indexed2 
+          column_metadata:
+            - name: birthdate
+              validator_class: LongType
+              # index will be added dynamically
+
     - name: Keyspace2
       replica_placement_strategy: org.apache.cassandra.locator.SimpleStrategy
       replication_factor: 1

Modified: 
cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=999741&r1=999740&r2=999741&view=diff
==============================================================================
--- 
cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java 
(original)
+++ 
cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java 
Wed Sep 22 03:41:38 2010
@@ -31,16 +31,21 @@ import org.junit.Test;
 import static junit.framework.Assert.assertEquals;
 import org.apache.cassandra.CleanupHelper;
 import org.apache.cassandra.Util;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
 import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.IndexClause;
 import org.apache.cassandra.thrift.IndexExpression;
 import org.apache.cassandra.thrift.IndexOperator;
+import org.apache.cassandra.thrift.IndexType;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.WrappedRunnable;
 
 import java.net.InetAddress;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.CollatingOrderPreservingPartitioner;
@@ -253,6 +258,34 @@ public class ColumnFamilyStoreTest exten
         assert Arrays.equals("k1".getBytes(), rows.get(0).key.key);
     }
 
+    @Test
+    public void testIndexCreate() throws IOException, ConfigurationException, 
InterruptedException
+    {
+        Table table = Table.open("Keyspace1");
+
+        // create a row and update the birthdate value, test that the index 
query fetches the new version
+        RowMutation rm;
+        rm = new RowMutation("Keyspace1", "k1".getBytes());
+        rm.add(new QueryPath("Indexed2", null, "birthdate".getBytes("UTF8")), 
FBUtilities.toByteArray(1L), new TimestampClock(1));
+        rm.apply();
+
+        ColumnFamilyStore cfs = table.getColumnFamilyStore("Indexed2");
+        ColumnDefinition old = 
cfs.metadata.column_metadata.get("birthdate".getBytes("UTF8"));
+        ColumnDefinition cd = new ColumnDefinition(old.name, 
old.validator.getClass().getName(), IndexType.KEYS, "birthdate_index");
+        cfs.addIndex(cd);
+        while (!SystemTable.isIndexBuilt("Keyspace1", 
cfs.getIndexedColumnFamilyStore("birthdate".getBytes("UTF8")).columnFamily))
+            TimeUnit.MILLISECONDS.sleep(100);
+
+        IndexExpression expr = new 
IndexExpression("birthdate".getBytes("UTF8"), IndexOperator.EQ, 
FBUtilities.toByteArray(1L));
+        IndexClause clause = new IndexClause(Arrays.asList(expr), 
ArrayUtils.EMPTY_BYTE_ARRAY, 100);
+        IFilter filter = new IdentityQueryFilter();
+        IPartitioner p = StorageService.getPartitioner();
+        Range range = new Range(p.getMinimumToken(), p.getMinimumToken());
+        List<Row> rows = table.getColumnFamilyStore("Indexed2").scan(clause, 
range, filter);
+        assert rows.size() == 1 : StringUtils.join(rows, ",");
+        assert Arrays.equals("k1".getBytes(), rows.get(0).key.key);
+    }
+
     private ColumnFamilyStore insertKey1Key2() throws IOException, 
ExecutionException, InterruptedException
     {
         List<RowMutation> rms = new LinkedList<RowMutation>();


Reply via email to