Author: jbellis
Date: Thu Dec 30 16:18:33 2010
New Revision: 1053927
URL: http://svn.apache.org/viewvc?rev=1053927&view=rev
Log:
merge from 0.7.0
Added:
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/CleanupTest.java
Modified:
cassandra/branches/cassandra-0.7/ (props changed)
cassandra/branches/cassandra-0.7/CHANGES.txt
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
(props changed)
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
(props changed)
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
(props changed)
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
(props changed)
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
(props changed)
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/CompactionIterator.java
Propchange: cassandra/branches/cassandra-0.7/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Dec 30 16:18:33 2010
@@ -1,6 +1,6 @@
/cassandra/branches/cassandra-0.6:922689-1053763
/cassandra/branches/cassandra-0.7:1026516,1035666,1050269
-/cassandra/branches/cassandra-0.7.0:1053690-1053891
+/cassandra/branches/cassandra-0.7.0:1053690-1053891,1053922
/cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
/cassandra/trunk:1026516-1026734,1028929
/incubator/cassandra/branches/cassandra-0.3:774578-796573
Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1053927&r1=1053926&r2=1053927&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Thu Dec 30 16:18:33 2010
@@ -22,6 +22,7 @@ dev
* change RandomPartitioner min token to -1 to avoid collision w/
tokens on actual nodes (CASSANDRA-1901)
* examine the right nibble when validating TimeUUID (CASSANDRA-1910)
+ * include secondary indexes in cleanup (CASSANDRA-1916)
* CFS.scrubDataDirectories should also cleanup invalid secondary indexes
(CASSANDRA-1904)
Propchange:
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Dec 30 16:18:33 2010
@@ -1,6 +1,6 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1053763
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516,1035666,1050269
-/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1053891
+/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1053891,1053922
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1026734,1028929
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
Propchange:
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Dec 30 16:18:33 2010
@@ -1,6 +1,6 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1053763
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516,1035666,1050269
-/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1053891
+/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1053891,1053922
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1026734,1028929
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
Propchange:
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Dec 30 16:18:33 2010
@@ -1,6 +1,6 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1053763
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516,1035666,1050269
-/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1053891
+/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1053891,1053922
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1026734,1028929
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
Propchange:
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Dec 30 16:18:33 2010
@@ -1,6 +1,6 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1053763
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516,1035666,1050269
-/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1053891
+/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1053891,1053922
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1026734,1028929
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
Propchange:
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Dec 30 16:18:33 2010
@@ -1,6 +1,6 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1053763
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516,1035666,1050269
-/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1053891
+/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1053891,1053922
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1026734,1028929
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1053927&r1=1053926&r2=1053927&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java
Thu Dec 30 16:18:33 2010
@@ -18,10 +18,12 @@
package org.apache.cassandra.db;
+import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
-import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.*;
@@ -31,7 +33,6 @@ import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.commons.collections.PredicateUtils;
-import org.apache.commons.collections.iterators.CollatingIterator;
import org.apache.commons.collections.iterators.FilterIterator;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
@@ -47,7 +48,6 @@ import org.apache.cassandra.io.sstable.*
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.service.AntiEntropyService;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
@@ -368,96 +368,109 @@ public class CompactionManager implement
}
/**
- * This function is used to do the anti compaction process , it spits out
the file which has keys that belong to a given range
- * If the target is not specified it spits out the file as a compacted
file with the unecessary ranges wiped out.
+ * This function goes over each file and removes the keys that the node is
not responsible for
+ * and only keeps keys that this node is responsible for.
*
- * @param cfs
- * @param sstables
- * @param ranges
- * @param target
- * @return
- * @throws java.io.IOException
+ * @throws IOException
*/
- private List<SSTableReader> doAntiCompaction(ColumnFamilyStore cfs,
Collection<SSTableReader> sstables, Collection<Range> ranges, InetAddress
target)
- throws IOException
+ private void doCleanupCompaction(ColumnFamilyStore cfs) throws IOException
{
+ assert !cfs.isIndex();
Table table = cfs.table;
- logger.info("AntiCompacting [" + StringUtils.join(sstables, ",") +
"]");
- // Calculate the expected compacted filesize
- long expectedRangeFileSize =
cfs.getExpectedCompactedFileSize(sstables) / 2;
- String compactionFileLocation =
table.getDataFileLocation(expectedRangeFileSize);
- if (compactionFileLocation == null)
- {
- throw new UnsupportedOperationException("disk full");
- }
+ Collection<Range> ranges =
StorageService.instance.getLocalRanges(table.name);
- List<SSTableReader> results = new ArrayList<SSTableReader>();
- long startTime = System.currentTimeMillis();
- long totalkeysWritten = 0;
-
- int expectedBloomFilterSize =
Math.max(DatabaseDescriptor.getIndexInterval(),
(int)(SSTableReader.getApproximateKeyCount(sstables) / 2));
- if (logger.isDebugEnabled())
- logger.debug("Expected bloom filter size : " +
expectedBloomFilterSize);
-
- SSTableWriter writer = null;
- CompactionIterator ci = new AntiCompactionIterator(cfs, sstables,
ranges, (int) (System.currentTimeMillis() / 1000) -
cfs.metadata.getGcGraceSeconds(), cfs.isCompleteSSTables(sstables));
- Iterator<AbstractCompactedRow> nni = new FilterIterator(ci,
PredicateUtils.notNullPredicate());
- executor.beginCompaction(cfs, ci);
-
- try
+ for (SSTableReader sstable : cfs.getSSTables())
{
- if (!nni.hasNext())
+ logger.info("AntiCompacting " + sstable);
+ // Calculate the expected compacted filesize
+ long expectedRangeFileSize =
cfs.getExpectedCompactedFileSize(Arrays.asList(sstable)) / 2;
+ String compactionFileLocation =
table.getDataFileLocation(expectedRangeFileSize);
+ if (compactionFileLocation == null)
+ throw new UnsupportedOperationException("disk full");
+
+ long startTime = System.currentTimeMillis();
+ long totalkeysWritten = 0;
+
+ int expectedBloomFilterSize =
Math.max(DatabaseDescriptor.getIndexInterval(),
+
(int)(SSTableReader.getApproximateKeyCount(Arrays.asList(sstable)) / 2));
+ if (logger.isDebugEnabled())
+ logger.debug("Expected bloom filter size : " +
expectedBloomFilterSize);
+
+ SSTableWriter writer = null;
+ SSTableScanner scanner =
sstable.getDirectScanner(CompactionIterator.FILE_BUFFER_SIZE);
+ SortedSet<ByteBuffer> indexedColumns = cfs.getIndexedColumns();
+ executor.beginCompaction(cfs, new CleanupInfo(sstable, scanner));
+ try
{
- return results;
+ while (scanner.hasNext())
+ {
+ SSTableIdentityIterator row = (SSTableIdentityIterator)
scanner.next();
+ if (Range.isTokenInRanges(row.getKey().token, ranges))
+ {
+ writer = maybeCreateWriter(cfs,
compactionFileLocation, expectedBloomFilterSize, writer);
+ writer.append(new EchoedRow(row));
+ totalkeysWritten++;
+ }
+ else
+ {
+ while (row.hasNext())
+ {
+ IColumn column = row.next();
+ if (indexedColumns.contains(column.name()))
+ Table.cleanupIndexEntry(cfs, row.getKey().key,
column);
+ }
+ }
+ }
+ }
+ finally
+ {
+ scanner.close();
}
- while (nni.hasNext())
+ List<SSTableReader> results = new ArrayList<SSTableReader>();
+ if (writer != null)
{
- AbstractCompactedRow row = nni.next();
- if (writer == null)
+ SSTableReader newSstable =
writer.closeAndOpenReader(sstable.maxDataAge);
+ results.add(newSstable);
+
+ String format = "AntiCompacted to %s. %,d to %,d (~%d%% of
original) bytes for %,d keys. Time: %,dms.";
+ long dTime = System.currentTimeMillis() - startTime;
+ long startsize = sstable.length();
+ long endsize = newSstable.length();
+ double ratio = (double)endsize / (double)startsize;
+ logger.info(String.format(format, writer.getFilename(),
startsize, endsize, (int)(ratio*100), totalkeysWritten, dTime));
+ }
+
+ // flush to ensure we don't lose the tombstones on a restart,
since they are not commitlog'd
+ for (ByteBuffer columnName : cfs.getIndexedColumns())
+ {
+ try
{
- FileUtils.createDirectory(compactionFileLocation);
- String newFilename = new
File(cfs.getTempSSTablePath(compactionFileLocation)).getAbsolutePath();
- writer = new SSTableWriter(newFilename,
expectedBloomFilterSize, cfs.metadata, cfs.partitioner);
+
cfs.getIndexedColumnFamilyStore(columnName).forceBlockingFlush();
+ }
+ catch (ExecutionException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
}
- writer.append(row);
- totalkeysWritten++;
}
+ cfs.replaceCompactedSSTables(Arrays.asList(sstable), results);
}
- finally
- {
- ci.close();
- }
-
- if (writer != null)
- {
- results.add(writer.closeAndOpenReader(getMaxDataAge(sstables)));
-
- String format = "AntiCompacted to %s. %,d to %,d (~%d%% of
original) bytes for %,d keys. Time: %,dms.";
- long dTime = System.currentTimeMillis() - startTime;
- long startsize = SSTable.getTotalBytes(sstables);
- long endsize = results.get(0).length();
- double ratio = (double)endsize / (double)startsize;
- logger.info(String.format(format, writer.getFilename(), startsize,
endsize, (int)(ratio*100), totalkeysWritten, dTime));
- }
-
- return results;
}
- /**
- * This function goes over each file and removes the keys that the node is
not responsible for
- * and only keeps keys that this node is responsible for.
- *
- * @throws IOException
- */
- private void doCleanupCompaction(ColumnFamilyStore cfs) throws IOException
+ private SSTableWriter maybeCreateWriter(ColumnFamilyStore cfs, String
compactionFileLocation, int expectedBloomFilterSize, SSTableWriter writer)
+ throws IOException
{
- Collection<SSTableReader> originalSSTables = cfs.getSSTables();
- List<SSTableReader> sstables = doAntiCompaction(cfs, originalSSTables,
StorageService.instance.getLocalRanges(cfs.table.name), null);
- if (!sstables.isEmpty())
+ if (writer == null)
{
- cfs.replaceCompactedSSTables(originalSSTables, sstables);
+ FileUtils.createDirectory(compactionFileLocation);
+ String newFilename = new
File(cfs.getTempSSTablePath(compactionFileLocation)).getAbsolutePath();
+ writer = new SSTableWriter(newFilename, expectedBloomFilterSize,
cfs.metadata, cfs.partitioner);
}
+ return writer;
}
/**
@@ -633,55 +646,6 @@ public class CompactionManager implement
}
}
- private static class AntiCompactionIterator extends CompactionIterator
- {
- private Set<SSTableScanner> scanners;
-
- public AntiCompactionIterator(ColumnFamilyStore cfStore,
Collection<SSTableReader> sstables, Collection<Range> ranges, int gcBefore,
boolean isMajor)
- throws IOException
- {
- super(cfStore, getCollatedRangeIterator(sstables, ranges),
gcBefore, isMajor);
- }
-
- private static Iterator
getCollatedRangeIterator(Collection<SSTableReader> sstables, final
Collection<Range> ranges)
- throws IOException
- {
- org.apache.commons.collections.Predicate rangesPredicate = new
org.apache.commons.collections.Predicate()
- {
- public boolean evaluate(Object row)
- {
- return
Range.isTokenInRanges(((SSTableIdentityIterator)row).getKey().token, ranges);
- }
- };
- // TODO CollatingIterator iter =
FBUtilities.<SSTableIdentityIterator>getCollatingIterator();
- CollatingIterator iter = FBUtilities.getCollatingIterator();
- for (SSTableReader sstable : sstables)
- {
- SSTableScanner scanner =
sstable.getDirectScanner(FILE_BUFFER_SIZE);
- iter.addIterator(new FilterIterator(scanner, rangesPredicate));
- }
- return iter;
- }
-
- public Iterable<SSTableScanner> getScanners()
- {
- if (scanners == null)
- {
- scanners = new HashSet<SSTableScanner>();
- for (Object o : ((CollatingIterator)source).getIterators())
- {
-
scanners.add((SSTableScanner)((FilterIterator)o).getIterator());
- }
- }
- return scanners;
- }
-
- public String getTaskType()
- {
- return "Cleanup";
- }
- }
-
public void checkAllColumnFamilies() throws IOException
{
// perform estimates
@@ -821,4 +785,63 @@ public class CompactionManager implement
throw new IllegalStateException("May not call
SimpleFuture.get(long, TimeUnit)");
}
}
+
+ private static class EchoedRow extends AbstractCompactedRow
+ {
+ private final SSTableIdentityIterator row;
+
+ public EchoedRow(SSTableIdentityIterator row)
+ {
+ super(row.getKey());
+ this.row = row;
+ }
+
+ public void write(DataOutput out) throws IOException
+ {
+ row.echoData(out);
+ }
+
+ public void update(MessageDigest digest)
+ {
+ // EchoedRow is not used in anti-entropy validation
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean isEmpty()
+ {
+ return !row.hasNext();
+ }
+
+ public int columnCount()
+ {
+ return row.columnCount;
+ }
+ }
+
+ private static class CleanupInfo implements ICompactionInfo
+ {
+ private final SSTableReader sstable;
+ private final SSTableScanner scanner;
+
+ public CleanupInfo(SSTableReader sstable, SSTableScanner scanner)
+ {
+ this.sstable = sstable;
+ this.scanner = scanner;
+ }
+
+ public long getTotalBytes()
+ {
+ return scanner.getFileLength();
+ }
+
+ public long getBytesRead()
+ {
+ return scanner.getFilePointer();
+ }
+
+ public String getTaskType()
+ {
+ return "Cleanup of " + sstable.getColumnFamilyName();
+ }
+ }
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java?rev=1053927&r1=1053926&r2=1053927&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java
Thu Dec 30 16:18:33 2010
@@ -571,6 +571,21 @@ public class Table
return fullMemtables;
}
+ public static void cleanupIndexEntry(ColumnFamilyStore cfs, ByteBuffer
key, IColumn column)
+ {
+ if (column.isMarkedForDelete())
+ return;
+ int localDeletionTime = (int) (System.currentTimeMillis() / 1000);
+ DecoratedKey<LocalToken> valueKey = cfs.getIndexKeyFor(column.name(),
column.value());
+ ColumnFamily cfi = cfs.newIndexedColumnFamily(column.name());
+ cfi.addTombstone(key, localDeletionTime, column.timestamp());
+ Memtable fullMemtable =
cfs.getIndexedColumnFamilyStore(column.name()).apply(valueKey, cfi);
+ if (logger.isDebugEnabled())
+ logger.debug("removed index entry for cleaned-up value {}:{}",
valueKey, cfi);
+ if (fullMemtable != null)
+ fullMemtable.cfs.maybeSwitchMemtable(fullMemtable, false);
+ }
+
public IndexBuilder createIndexBuilder(ColumnFamilyStore cfs,
SortedSet<ByteBuffer> columns, ReducingKeyIterator iter)
{
return new IndexBuilder(cfs, columns, iter);
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/CompactionIterator.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/CompactionIterator.java?rev=1053927&r1=1053926&r2=1053927&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/CompactionIterator.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/CompactionIterator.java
Thu Dec 30 16:18:33 2010
@@ -44,7 +44,7 @@ implements Closeable, ICompactionInfo
{
private static Logger logger =
LoggerFactory.getLogger(CompactionIterator.class);
- protected static final int FILE_BUFFER_SIZE = 1024 * 1024;
+ public static final int FILE_BUFFER_SIZE = 1024 * 1024;
protected final List<SSTableIdentityIterator> rows = new
ArrayList<SSTableIdentityIterator>();
private final ColumnFamilyStore cfs;
Added:
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/CleanupTest.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/CleanupTest.java?rev=1053927&view=auto
==============================================================================
---
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/CleanupTest.java
(added)
+++
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/CleanupTest.java
Thu Dec 30 16:18:33 2010
@@ -0,0 +1,124 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.db;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.junit.Test;
+
+import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
+import org.apache.cassandra.db.filter.IFilter;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.thrift.IndexClause;
+import org.apache.cassandra.thrift.IndexExpression;
+import org.apache.cassandra.thrift.IndexOperator;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class CleanupTest extends CleanupHelper
+{
+ public static final int LOOPS = 800;
+ public static final String TABLE1 = "Keyspace1";
+ public static final String CF1 = "Indexed1";
+ public static final ByteBuffer COLUMN =
ByteBuffer.wrap("birthdate".getBytes());
+ public static final ByteBuffer VALUE = ByteBuffer.allocate(8);
+ static
+ {
+ VALUE.putLong(20101229);
+ VALUE.flip();
+ }
+
+ @Test
+ public void testCleanup() throws IOException, ExecutionException,
InterruptedException
+ {
+ Table table = Table.open(TABLE1);
+
+ ColumnFamilyStore cfs = table.getColumnFamilyStore(CF1);
+ fillCF(cfs, LOOPS);
+
+ assertEquals(cfs.getIndexedColumns().iterator().next(), COLUMN);
+
+ ColumnFamilyStore cfi = cfs.getIndexedColumnFamilyStore(COLUMN);
+
+ assertTrue(cfi.isIndexBuilt());
+
+ IndexExpression expr = new IndexExpression(COLUMN, IndexOperator.EQ,
VALUE);
+ IndexClause clause = new IndexClause(Arrays.asList(expr),
FBUtilities.EMPTY_BYTE_BUFFER, Integer.MAX_VALUE);
+ IFilter filter = new IdentityQueryFilter();
+ IPartitioner p = StorageService.getPartitioner();
+ Range range = new Range(p.getMinimumToken(), p.getMinimumToken());
+ List<Row> rows = table.getColumnFamilyStore(CF1).scan(clause, range,
filter);
+
+ assertEquals(LOOPS, rows.size());
+
+ TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+
+ assertNotNull(tmd);
+ assertEquals(0, tmd.getTokenToEndpointMap().size());
+
+ // Since this test has no ring cleanup will remove all
+ CompactionManager.instance.performCleanup(cfs);
+
+ // row data should be gone
+ rows = cfs.getRangeSlice(null, Util.range("", ""), 1000, new
IdentityQueryFilter());
+ assertEquals(0, rows.size());
+
+ // not only should it be gone but there should be no data on disk, not
even tombstones
+ assert cfs.getSSTables().isEmpty();
+
+ // 2ary indexes should result in no results, but
+ rows = cfs.scan(clause, range, filter);
+ assertEquals(0, rows.size());
+ }
+
+ protected void fillCF(ColumnFamilyStore store, int rowsPerSSTable) throws
ExecutionException, InterruptedException, IOException
+ {
+ CompactionManager.instance.disableAutoCompaction();
+
+ for (int i = 0; i < rowsPerSSTable; i++)
+ {
+ String key = String.valueOf(i);
+
+ // create a row and update the birthdate value, test that the
index query fetches the new version
+ RowMutation rm;
+ rm = new RowMutation(TABLE1, ByteBufferUtil.bytes(key));
+ rm.add(new QueryPath(CF1, null, COLUMN), VALUE,
System.currentTimeMillis());
+ rm.apply();
+ }
+
+ store.forceBlockingFlush();
+ store.buildSecondaryIndexes(store.getSSTables(),
store.getIndexedColumns());
+ }
+}