Author: jbellis
Date: Thu Dec 30 16:33:17 2010
New Revision: 1053933

URL: http://svn.apache.org/viewvc?rev=1053933&view=rev
Log:
merge from 0.7

Added:
    cassandra/trunk/test/unit/org/apache/cassandra/db/CleanupTest.java
Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/CHANGES.txt
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
   (props changed)
    cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
    cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Dec 30 16:33:17 2010
@@ -1,5 +1,5 @@
 
/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1053763
-/cassandra/branches/cassandra-0.7:1026516-1053893
+/cassandra/branches/cassandra-0.7:1026516-1053893,1053928,1053931
 /cassandra/branches/cassandra-0.7.0:1053690-1053891
 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3:774578-796573

Modified: cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1053933&r1=1053932&r2=1053933&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Dec 30 16:33:17 2010
@@ -27,6 +27,7 @@
  * change RandomPartitioner min token to -1 to avoid collision w/
    tokens on actual nodes (CASSANDRA-1901)
  * examine the right nibble when validating TimeUUID (CASSANDRA-1910)
+ * include secondary indexes in cleanup (CASSANDRA-1916)
  * CFS.scrubDataDirectories should also cleanup invalid secondary indexes
    (CASSANDRA-1904)
 

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Dec 30 16:33:17 2010
@@ -1,5 +1,5 @@
 
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1053763
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1053893
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1053893,1053928,1053931
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1053891
 
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Dec 30 16:33:17 2010
@@ -1,5 +1,5 @@
 
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1053763
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1053893
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1053893,1053928,1053931
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1053891
 
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Dec 30 16:33:17 2010
@@ -1,5 +1,5 @@
 
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1053763
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1053893
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1053893,1053928,1053931
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1053891
 
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Dec 30 16:33:17 2010
@@ -1,5 +1,5 @@
 
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1053763
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1053893
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1053893,1053928,1053931
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1053891
 
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Dec 30 16:33:17 2010
@@ -1,5 +1,5 @@
 
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1053763
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1053893
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1053893,1053928,1053931
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1053891
 
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1053933&r1=1053932&r2=1053933&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Thu 
Dec 30 16:33:17 2010
@@ -18,10 +18,12 @@
 
 package org.apache.cassandra.db;
 
+import java.io.DataOutput;
 import java.io.File;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
-import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
 import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.*;
@@ -31,7 +33,6 @@ import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import org.apache.commons.collections.PredicateUtils;
-import org.apache.commons.collections.iterators.CollatingIterator;
 import org.apache.commons.collections.iterators.FilterIterator;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
@@ -48,7 +49,6 @@ import org.apache.cassandra.io.util.File
 import org.apache.cassandra.service.AntiEntropyService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.OperationType;
-import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
@@ -369,96 +369,109 @@ public class CompactionManager implement
     }
 
     /**
-     * This function is used to do the anti compaction process , it spits out 
the file which has keys that belong to a given range
-     * If the target is not specified it spits out the file as a compacted 
file with the unecessary ranges wiped out.
+     * This function goes over each file and removes the keys that the node is 
not responsible for
+     * and only keeps keys that this node is responsible for.
      *
-     * @param cfs
-     * @param sstables
-     * @param ranges
-     * @param target
-     * @return
-     * @throws java.io.IOException
+     * @throws IOException
      */
-    private List<SSTableReader> doAntiCompaction(ColumnFamilyStore cfs, 
Collection<SSTableReader> sstables, Collection<Range> ranges, InetAddress 
target)
-            throws IOException
+    private void doCleanupCompaction(ColumnFamilyStore cfs) throws IOException
     {
+        assert !cfs.isIndex();
         Table table = cfs.table;
-        logger.info("AntiCompacting [" + StringUtils.join(sstables, ",") + 
"]");
-        // Calculate the expected compacted filesize
-        long expectedRangeFileSize = 
cfs.getExpectedCompactedFileSize(sstables) / 2;
-        String compactionFileLocation = 
table.getDataFileLocation(expectedRangeFileSize);
-        if (compactionFileLocation == null)
-        {
-            throw new UnsupportedOperationException("disk full");
-        }
+        Collection<Range> ranges = 
StorageService.instance.getLocalRanges(table.name);
 
-        List<SSTableReader> results = new ArrayList<SSTableReader>();
-        long startTime = System.currentTimeMillis();
-        long totalkeysWritten = 0;
-
-        int expectedBloomFilterSize = 
Math.max(DatabaseDescriptor.getIndexInterval(), 
(int)(SSTableReader.getApproximateKeyCount(sstables) / 2));
-        if (logger.isDebugEnabled())
-          logger.debug("Expected bloom filter size : " + 
expectedBloomFilterSize);
-
-        SSTableWriter writer = null;
-        CompactionIterator ci = new AntiCompactionIterator(cfs, sstables, 
ranges, (int) (System.currentTimeMillis() / 1000) - 
cfs.metadata.getGcGraceSeconds(), cfs.isCompleteSSTables(sstables));
-        Iterator<AbstractCompactedRow> nni = new FilterIterator(ci, 
PredicateUtils.notNullPredicate());
-        executor.beginCompaction(cfs, ci);
-
-        try
+        for (SSTableReader sstable : cfs.getSSTables())
         {
-            if (!nni.hasNext())
+            logger.info("Cleaning up " + sstable);
+            // Calculate the expected compacted filesize
+            long expectedRangeFileSize = 
cfs.getExpectedCompactedFileSize(Arrays.asList(sstable)) / 2;
+            String compactionFileLocation = 
table.getDataFileLocation(expectedRangeFileSize);
+            if (compactionFileLocation == null)
+                throw new UnsupportedOperationException("disk full");
+
+            long startTime = System.currentTimeMillis();
+            long totalkeysWritten = 0;
+
+            int expectedBloomFilterSize = 
Math.max(DatabaseDescriptor.getIndexInterval(),
+                                                   
(int)(SSTableReader.getApproximateKeyCount(Arrays.asList(sstable)) / 2));
+            if (logger.isDebugEnabled())
+              logger.debug("Expected bloom filter size : " + 
expectedBloomFilterSize);
+
+            SSTableWriter writer = null;
+            SSTableScanner scanner = 
sstable.getDirectScanner(CompactionIterator.FILE_BUFFER_SIZE);
+            SortedSet<ByteBuffer> indexedColumns = cfs.getIndexedColumns();
+            executor.beginCompaction(cfs, new CleanupInfo(sstable, scanner));
+            try
             {
-                return results;
+                while (scanner.hasNext())
+                {
+                    SSTableIdentityIterator row = (SSTableIdentityIterator) 
scanner.next();
+                    if (Range.isTokenInRanges(row.getKey().token, ranges))
+                    {
+                        writer = maybeCreateWriter(cfs, 
compactionFileLocation, expectedBloomFilterSize, writer);
+                        writer.append(new EchoedRow(row));
+                        totalkeysWritten++;
+                    }
+                    else
+                    {
+                        while (row.hasNext())
+                        {
+                            IColumn column = row.next();
+                            if (indexedColumns.contains(column.name()))
+                                Table.cleanupIndexEntry(cfs, row.getKey().key, 
column);
+                        }
+                    }
+                }
+            }
+            finally
+            {
+                scanner.close();
             }
 
-            while (nni.hasNext())
+            List<SSTableReader> results = new ArrayList<SSTableReader>();
+            if (writer != null)
             {
-                AbstractCompactedRow row = nni.next();
-                if (writer == null)
+                SSTableReader newSstable = 
writer.closeAndOpenReader(sstable.maxDataAge);
+                results.add(newSstable);
+
+                String format = "Cleaned up to %s.  %,d to %,d (~%d%% of 
original) bytes for %,d keys.  Time: %,dms.";
+                long dTime = System.currentTimeMillis() - startTime;
+                long startsize = sstable.length();
+                long endsize = newSstable.length();
+                double ratio = (double)endsize / (double)startsize;
+                logger.info(String.format(format, writer.getFilename(), 
startsize, endsize, (int)(ratio*100), totalkeysWritten, dTime));
+            }
+
+            // flush to ensure we don't lose the tombstones on a restart, 
since they are not commitlog'd
+            for (ByteBuffer columnName : cfs.getIndexedColumns())
+            {
+                try
                 {
-                    FileUtils.createDirectory(compactionFileLocation);
-                    String newFilename = new 
File(cfs.getTempSSTablePath(compactionFileLocation)).getAbsolutePath();
-                    writer = new SSTableWriter(newFilename, 
expectedBloomFilterSize, cfs.metadata, cfs.partitioner);
+                    
cfs.getIndexedColumnFamilyStore(columnName).forceBlockingFlush();
+                }
+                catch (ExecutionException e)
+                {
+                    throw new RuntimeException(e);
+                }
+                catch (InterruptedException e)
+                {
+                    throw new AssertionError(e);
                 }
-                writer.append(row);
-                totalkeysWritten++;
             }
+            cfs.replaceCompactedSSTables(Arrays.asList(sstable), results);
         }
-        finally
-        {
-            ci.close();
-        }
-
-        if (writer != null)
-        {
-            results.add(writer.closeAndOpenReader(getMaxDataAge(sstables)));
-
-            String format = "AntiCompacted to %s.  %,d to %,d (~%d%% of 
original) bytes for %,d keys.  Time: %,dms.";
-            long dTime = System.currentTimeMillis() - startTime;
-            long startsize = SSTable.getTotalBytes(sstables);
-            long endsize = results.get(0).length();
-            double ratio = (double)endsize / (double)startsize;
-            logger.info(String.format(format, writer.getFilename(), startsize, 
endsize, (int)(ratio*100), totalkeysWritten, dTime));
-        }
-
-        return results;
     }
 
-    /**
-     * This function goes over each file and removes the keys that the node is 
not responsible for
-     * and only keeps keys that this node is responsible for.
-     *
-     * @throws IOException
-     */
-    private void doCleanupCompaction(ColumnFamilyStore cfs) throws IOException
+    private SSTableWriter maybeCreateWriter(ColumnFamilyStore cfs, String 
compactionFileLocation, int expectedBloomFilterSize, SSTableWriter writer)
+            throws IOException
     {
-        Collection<SSTableReader> originalSSTables = cfs.getSSTables();
-        List<SSTableReader> sstables = doAntiCompaction(cfs, originalSSTables, 
StorageService.instance.getLocalRanges(cfs.table.name), null);
-        if (!sstables.isEmpty())
+        if (writer == null)
         {
-            cfs.replaceCompactedSSTables(originalSSTables, sstables);
+            FileUtils.createDirectory(compactionFileLocation);
+            String newFilename = new 
File(cfs.getTempSSTablePath(compactionFileLocation)).getAbsolutePath();
+            writer = new SSTableWriter(newFilename, expectedBloomFilterSize, 
cfs.metadata, cfs.partitioner);
         }
+        return writer;
     }
 
     /**
@@ -634,55 +647,6 @@ public class CompactionManager implement
         }
     }
 
-    private static class AntiCompactionIterator extends CompactionIterator
-    {
-        private Set<SSTableScanner> scanners;
-
-        public AntiCompactionIterator(ColumnFamilyStore cfStore, 
Collection<SSTableReader> sstables, Collection<Range> ranges, int gcBefore, 
boolean isMajor)
-                throws IOException
-        {
-            super(cfStore, getCollatedRangeIterator(sstables, ranges), 
gcBefore, isMajor);
-        }
-
-        private static Iterator 
getCollatedRangeIterator(Collection<SSTableReader> sstables, final 
Collection<Range> ranges)
-                throws IOException
-        {
-            org.apache.commons.collections.Predicate rangesPredicate = new 
org.apache.commons.collections.Predicate()
-            {
-                public boolean evaluate(Object row)
-                {
-                    return 
Range.isTokenInRanges(((SSTableIdentityIterator)row).getKey().token, ranges);
-                }
-            };
-            // TODO CollatingIterator iter = 
FBUtilities.<SSTableIdentityIterator>getCollatingIterator();
-            CollatingIterator iter = FBUtilities.getCollatingIterator();
-            for (SSTableReader sstable : sstables)
-            {
-                SSTableScanner scanner = 
sstable.getDirectScanner(FILE_BUFFER_SIZE);
-                iter.addIterator(new FilterIterator(scanner, rangesPredicate));
-            }
-            return iter;
-        }
-
-        public Iterable<SSTableScanner> getScanners()
-        {
-            if (scanners == null)
-            {
-                scanners = new HashSet<SSTableScanner>();
-                for (Object o : ((CollatingIterator)source).getIterators())
-                {
-                    
scanners.add((SSTableScanner)((FilterIterator)o).getIterator());
-                }
-            }
-            return scanners;
-        }
-
-        public String getTaskType()
-        {
-            return "Cleanup";
-        }
-    }
-
     public void checkAllColumnFamilies() throws IOException
     {
         // perform estimates
@@ -822,4 +786,63 @@ public class CompactionManager implement
             throw new IllegalStateException("May not call 
SimpleFuture.get(long, TimeUnit)");
         }
     }
+
+    private static class EchoedRow extends AbstractCompactedRow
+    {
+        private final SSTableIdentityIterator row;
+
+        public EchoedRow(SSTableIdentityIterator row)
+        {
+            super(row.getKey());
+            this.row = row;
+        }
+
+        public void write(DataOutput out) throws IOException
+        {
+            row.echoData(out);
+        }
+
+        public void update(MessageDigest digest)
+        {
+            // EchoedRow is not used in anti-entropy validation
+            throw new UnsupportedOperationException();
+        }
+
+        public boolean isEmpty()
+        {
+            return !row.hasNext();
+        }
+
+        public int columnCount()
+        {
+            return row.columnCount;
+        }
+    }
+
+    private static class CleanupInfo implements ICompactionInfo
+    {
+        private final SSTableReader sstable;
+        private final SSTableScanner scanner;
+
+        public CleanupInfo(SSTableReader sstable, SSTableScanner scanner)
+        {
+            this.sstable = sstable;
+            this.scanner = scanner;
+        }
+
+        public long getTotalBytes()
+        {
+            return scanner.getFileLength();
+        }
+
+        public long getBytesRead()
+        {
+            return scanner.getFilePointer();
+        }
+
+        public String getTaskType()
+        {
+            return "Cleanup of " + sstable.getColumnFamilyName();
+        }
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=1053933&r1=1053932&r2=1053933&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Thu Dec 30 
16:33:17 2010
@@ -571,6 +571,21 @@ public class Table
         return fullMemtables;
     }
 
+    public static void cleanupIndexEntry(ColumnFamilyStore cfs, ByteBuffer 
key, IColumn column)
+    {
+        if (column.isMarkedForDelete())
+            return;
+        int localDeletionTime = (int) (System.currentTimeMillis() / 1000);
+        DecoratedKey<LocalToken> valueKey = cfs.getIndexKeyFor(column.name(), 
column.value());
+        ColumnFamily cfi = cfs.newIndexedColumnFamily(column.name());
+        cfi.addTombstone(key, localDeletionTime, column.timestamp());
+        Memtable fullMemtable = 
cfs.getIndexedColumnFamilyStore(column.name()).apply(valueKey, cfi);
+        if (logger.isDebugEnabled())
+            logger.debug("removed index entry for cleaned-up value {}:{}", 
valueKey, cfi);
+        if (fullMemtable != null)
+            fullMemtable.cfs.maybeSwitchMemtable(fullMemtable, false);
+    }
+
     public IndexBuilder createIndexBuilder(ColumnFamilyStore cfs, 
SortedSet<ByteBuffer> columns, ReducingKeyIterator iter)
     {
         return new IndexBuilder(cfs, columns, iter);

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java?rev=1053933&r1=1053932&r2=1053933&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java 
Thu Dec 30 16:33:17 2010
@@ -44,7 +44,7 @@ implements Closeable, ICompactionInfo
 {
     private static Logger logger = 
LoggerFactory.getLogger(CompactionIterator.class);
 
-    protected static final int FILE_BUFFER_SIZE = 1024 * 1024;
+    public static final int FILE_BUFFER_SIZE = 1024 * 1024;
 
     protected final List<SSTableIdentityIterator> rows = new 
ArrayList<SSTableIdentityIterator>();
     private final ColumnFamilyStore cfs;

Added: cassandra/trunk/test/unit/org/apache/cassandra/db/CleanupTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/CleanupTest.java?rev=1053933&view=auto
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/CleanupTest.java (added)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/CleanupTest.java Thu Dec 
30 16:33:17 2010
@@ -0,0 +1,124 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.db;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.junit.Test;
+
+import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
+import org.apache.cassandra.db.filter.IFilter;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.thrift.IndexClause;
+import org.apache.cassandra.thrift.IndexExpression;
+import org.apache.cassandra.thrift.IndexOperator;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class CleanupTest extends CleanupHelper
+{
+    public static final int LOOPS = 800;
+    public static final String TABLE1 = "Keyspace1";
+    public static final String CF1 = "Indexed1";
+    public static final ByteBuffer COLUMN = 
ByteBuffer.wrap("birthdate".getBytes());
+    public static final ByteBuffer VALUE = ByteBuffer.allocate(8);
+    static
+    {
+        VALUE.putLong(20101229);
+        VALUE.flip();
+    }
+
+    @Test
+    public void testCleanup() throws IOException, ExecutionException, 
InterruptedException
+    {
+        Table table = Table.open(TABLE1);
+
+        ColumnFamilyStore cfs = table.getColumnFamilyStore(CF1);
+        fillCF(cfs, LOOPS);
+
+        assertEquals(cfs.getIndexedColumns().iterator().next(), COLUMN);
+
+        ColumnFamilyStore cfi = cfs.getIndexedColumnFamilyStore(COLUMN);
+
+        assertTrue(cfi.isIndexBuilt());
+
+        IndexExpression expr = new IndexExpression(COLUMN, IndexOperator.EQ, 
VALUE);
+        IndexClause clause = new IndexClause(Arrays.asList(expr), 
FBUtilities.EMPTY_BYTE_BUFFER, Integer.MAX_VALUE);
+        IFilter filter = new IdentityQueryFilter();
+        IPartitioner p = StorageService.getPartitioner();
+        Range range = new Range(p.getMinimumToken(), p.getMinimumToken());
+        List<Row> rows = table.getColumnFamilyStore(CF1).scan(clause, range, 
filter);
+
+        assertEquals(LOOPS, rows.size());
+
+        TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+
+        assertNotNull(tmd);
+        assertEquals(0, tmd.getTokenToEndpointMap().size());
+
+        // Since this test has no ring cleanup will remove all
+        CompactionManager.instance.performCleanup(cfs);
+
+        // row data should be gone
+        rows = cfs.getRangeSlice(null, Util.range("", ""), 1000, new 
IdentityQueryFilter());
+        assertEquals(0, rows.size());
+
+        // not only should it be gone but there should be no data on disk, not 
even tombstones
+        assert cfs.getSSTables().isEmpty();
+
+        // 2ary indexes should result in no results, but
+        rows = cfs.scan(clause, range, filter);
+        assertEquals(0, rows.size());
+    }
+
+    protected void fillCF(ColumnFamilyStore store, int rowsPerSSTable) throws 
ExecutionException, InterruptedException, IOException
+    {
+        CompactionManager.instance.disableAutoCompaction();
+
+        for (int i = 0; i < rowsPerSSTable; i++)
+        {
+            String key = String.valueOf(i);
+
+            // create a row and update the birthdate value, test that the 
index query fetches the new version
+            RowMutation rm;
+            rm = new RowMutation(TABLE1, ByteBufferUtil.bytes(key));
+            rm.add(new QueryPath(CF1, null, COLUMN), VALUE, 
System.currentTimeMillis());
+            rm.apply();
+        }
+
+        store.forceBlockingFlush();        
+        store.buildSecondaryIndexes(store.getSSTables(), 
store.getIndexedColumns());
+    }
+}


Reply via email to