Author: jbellis
Date: Sun Aug 21 04:50:55 2011
New Revision: 1159942
URL: http://svn.apache.org/viewvc?rev=1159942&view=rev
Log:
Stop reading from sstables once we know we have the most recent columns
patch by Daniel Lundin and jbellis for CASSANDRA-2498
Added:
cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/src/java/org/apache/cassandra/db/Column.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java
cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java
cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1159942&r1=1159941&r2=1159942&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Sun Aug 21 04:50:55 2011
@@ -35,6 +35,9 @@
* fix missing logging for some exceptions (CASSANDRA-2061)
* refactor and optimize ColumnFamilyStore.files(...) and
Descriptor.fromFilename(String)
and few other places responsible for work with SSTable files
(CASSANDRA-3040)
+ * Stop reading from sstables once we know we have the most recent columns,
+ for query-by-name requests (CASSANDRA-2498)
+
0.8.5
* fix NPE when encryption_options is unspecified (CASSANDRA-3007)
Added: cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java?rev=1159942&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java
(added)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java
Sun Aug 21 04:50:55 2011
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.cassandra.db;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.columniterator.IColumnIterator;
+import org.apache.cassandra.db.columniterator.SimpleAbstractColumnIterator;
+import org.apache.cassandra.db.filter.NamesQueryFilter;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.marshal.CounterColumnType;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.CloseableIterator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import com.google.common.collect.Iterables;
+
+public class CollationController
+{
+ private static Logger logger =
LoggerFactory.getLogger(CollationController.class);
+
+ private final DataTracker.View dataview;
+ private final ISortedColumns.Factory factory;
+ private final QueryFilter filter;
+ private final int gcBefore;
+ private final CFMetaData metadata;
+
+ private int sstablesIterated = 0;
+
+ public CollationController(DataTracker.View dataview,
ISortedColumns.Factory factory, QueryFilter filter, CFMetaData metadata, int
gcBefore)
+ {
+ this.dataview = dataview;
+ this.factory = factory;
+ this.filter = filter;
+ this.gcBefore = gcBefore;
+ this.metadata = metadata;
+ }
+
+ public ColumnFamily getTopLevelColumns()
+ {
+ return filter.filter instanceof NamesQueryFilter &&
metadata.getDefaultValidator() != CounterColumnType.instance
+ ? collectTimeOrderedData()
+ : collectAllData();
+ }
+
+ /**
+ * Collects data in order of recency, using the sstable maxtimestamp data.
+ * Once we have data for all requests columns that is newer than the
newest remaining maxtimestamp,
+ * we stop.
+ */
+ private ColumnFamily collectTimeOrderedData()
+ {
+ logger.debug("collectTimeOrderedData");
+ List<IColumnIterator> iterators = new ArrayList<IColumnIterator>();
+ final ColumnFamily container = ColumnFamily.create(metadata, factory,
filter.filter.isReversed());
+
+ try
+ {
+ for (Memtable memtable :
Iterables.concat(dataview.memtablesPendingFlush,
Collections.singleton(dataview.memtable)))
+ {
+ IColumnIterator iter =
filter.getMemtableColumnIterator(memtable, metadata.comparator);
+ if (iter != null)
+ {
+ iterators.add(iter);
+ container.delete(iter.getColumnFamily());
+ while (iter.hasNext())
+ container.addColumn(iter.next());
+ }
+ }
+
+ // avoid changing the filter columns of the original filter
+ // (reduceNameFilter removes columns that are known to be
irrelevant)
+ TreeSet<ByteBuffer> filterColumns = new
TreeSet<ByteBuffer>(metadata.comparator);
+ filterColumns.addAll(((NamesQueryFilter) filter.filter).columns);
+ QueryFilter reducedFilter = new QueryFilter(filter.key,
filter.path, new NamesQueryFilter(filterColumns));
+
+ // read sorted sstables
+ for (SSTableReader sstable : dataview.sstables)
+ {
+ long currentMaxTs = sstable.getMaxTimestamp();
+ reduceNameFilter(reducedFilter, container, currentMaxTs);
+ if (((NamesQueryFilter)
reducedFilter.filter).columns.isEmpty())
+ break;
+
+ IColumnIterator iter =
reducedFilter.getSSTableColumnIterator(sstable);
+ iterators.add(iter);
+ if (iter.getColumnFamily() != null)
+ {
+ container.delete(iter.getColumnFamily());
+ sstablesIterated++;
+ while (iter.hasNext())
+ container.addColumn(iter.next());
+ }
+ }
+ }
+ finally
+ {
+ for (IColumnIterator iter : iterators)
+ FileUtils.closeQuietly(iter);
+ }
+
+ // we need to distinguish between "there is no data at all for this
row" (BF will let us rebuild that efficiently)
+ // and "there used to be data, but it's gone now" (we should cache the
empty CF so we don't need to rebuild that slower)
+ if (iterators.isEmpty())
+ return null;
+
+ // do a final collate. toCollate is boilerplate required to provide a
CloseableIterator
+ CloseableIterator<IColumn> toCollate = new
SimpleAbstractColumnIterator()
+ {
+ final Iterator<IColumn> iter = container.iterator();
+
+ protected IColumn computeNext()
+ {
+ return iter.hasNext() ? iter.next() : endOfData();
+ }
+
+ public ColumnFamily getColumnFamily()
+ {
+ return container;
+ }
+
+ public DecoratedKey getKey()
+ {
+ return filter.key;
+ }
+ };
+ ColumnFamily returnCF = container.cloneMeShallow();
+ filter.collateColumns(returnCF, Collections.singletonList(toCollate),
metadata.comparator, gcBefore);
+
+ // Caller is responsible for final removeDeletedCF. This is important
for cacheRow to work correctly:
+ return returnCF;
+ }
+
+ /**
+ * remove columns from @param filter where we already have data in @param
returnCF newer than @param sstableTimestamp
+ */
+ private void reduceNameFilter(QueryFilter filter, ColumnFamily returnCF,
long sstableTimestamp)
+ {
+ AbstractColumnContainer container = filter.path.superColumnName != null
+ ? (SuperColumn)
returnCF.getColumn(filter.path.superColumnName)
+ : returnCF;
+ if (container == null)
+ return;
+
+ for (Iterator<ByteBuffer> iterator = ((NamesQueryFilter)
filter.filter).columns.iterator(); iterator.hasNext(); )
+ {
+ ByteBuffer filterColumn = iterator.next();
+ IColumn column = container.getColumn(filterColumn);
+ if (column != null && column.minTimestamp() > sstableTimestamp)
+ iterator.remove();
+ }
+ }
+
+ /**
+ * Collects data the brute-force way: gets an iterator for the filter in
question
+ * from every memtable and sstable, then merges them together.
+ */
+ private ColumnFamily collectAllData()
+ {
+ logger.debug("collectAllData");
+ List<IColumnIterator> iterators = new ArrayList<IColumnIterator>();
+ ColumnFamily returnCF = ColumnFamily.create(metadata, factory,
filter.filter.isReversed());
+
+ try
+ {
+ for (Memtable memtable :
Iterables.concat(dataview.memtablesPendingFlush,
Collections.singleton(dataview.memtable)))
+ {
+ IColumnIterator iter =
filter.getMemtableColumnIterator(memtable, metadata.comparator);
+ if (iter != null)
+ {
+ returnCF.delete(iter.getColumnFamily());
+ iterators.add(iter);
+ }
+ }
+
+ /* add the SSTables on disk */
+ for (SSTableReader sstable : dataview.sstables)
+ {
+ IColumnIterator iter =
filter.getSSTableColumnIterator(sstable);
+ iterators.add(iter);
+ if (iter.getColumnFamily() != null)
+ {
+ returnCF.delete(iter.getColumnFamily());
+ sstablesIterated++;
+ }
+ }
+ }
+ finally
+ {
+ for (IColumnIterator iter : iterators)
+ FileUtils.closeQuietly(iter);
+ }
+
+ // we need to distinguish between "there is no data at all for this
row" (BF will let us rebuild that efficiently)
+ // and "there used to be data, but it's gone now" (we should cache the
empty CF so we don't need to rebuild that slower)
+ if (iterators.isEmpty())
+ return null;
+
+ filter.collateColumns(returnCF, iterators, metadata.comparator,
gcBefore);
+
+ // Caller is responsible for final removeDeletedCF. This is important
for cacheRow to work correctly:
+ return returnCF;
+ }
+
+ public int getSstablesIterated()
+ {
+ return sstablesIterated;
+ }
+}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Column.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java?rev=1159942&r1=1159941&r2=1159942&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Column.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Column.java Sun Aug 21
04:50:55 2011
@@ -104,6 +104,11 @@ public class Column implements IColumn
return timestamp;
}
+ public long minTimestamp()
+ {
+ return timestamp;
+ }
+
public boolean isMarkedForDelete()
{
return false;
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1159942&r1=1159941&r2=1159942&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Sun
Aug 21 04:50:55 2011
@@ -1278,73 +1278,17 @@ public class ColumnFamilyStore implement
private ColumnFamily getTopLevelColumns(QueryFilter filter, int gcBefore,
ISortedColumns.Factory factory)
{
- // we are querying top-level columns, do a merging fetch with indexes.
- List<IColumnIterator> iterators = new ArrayList<IColumnIterator>();
- final ColumnFamily returnCF = ColumnFamily.create(metadata, factory,
filter.filter.isReversed());
DataTracker.View currentView = markCurrentViewReferenced();
try
{
- IColumnIterator iter;
- int sstablesToIterate = 0;
-
- /* add the current memtable */
- iter = filter.getMemtableColumnIterator(currentView.memtable,
getComparator());
- if (iter != null)
- {
- returnCF.delete(iter.getColumnFamily());
- iterators.add(iter);
- }
-
- /* add the memtables being flushed */
- for (Memtable memtable : currentView.memtablesPendingFlush)
- {
- iter = filter.getMemtableColumnIterator(memtable,
getComparator());
- if (iter != null)
- {
- returnCF.delete(iter.getColumnFamily());
- iterators.add(iter);
- }
- }
-
- /* add the SSTables on disk */
- for (SSTableReader sstable : currentView.sstables)
- {
- iter = filter.getSSTableColumnIterator(sstable);
- if (iter.getColumnFamily() != null)
- {
- returnCF.delete(iter.getColumnFamily());
- iterators.add(iter);
- sstablesToIterate++;
- }
- }
-
- recentSSTablesPerRead.add(sstablesToIterate);
- sstablesPerRead.add(sstablesToIterate);
-
- // we need to distinguish between "there is no data at all for
this row" (BF will let us rebuild that efficiently)
- // and "there used to be data, but it's gone now" (we should cache
the empty CF so we don't need to rebuild that slower)
- if (iterators.size() == 0)
- return null;
-
- filter.collateColumns(returnCF, iterators, getComparator(),
gcBefore);
-
- // Caller is responsible for final removeDeletedCF. This is
important for cacheRow to work correctly:
- return returnCF;
+ CollationController controller = new
CollationController(currentView, factory, filter, metadata, gcBefore);
+ ColumnFamily columns = controller.getTopLevelColumns();
+ recentSSTablesPerRead.add(controller.getSstablesIterated());
+ sstablesPerRead.add(controller.getSstablesIterated());
+ return columns;
}
finally
{
- /* close all cursors */
- for (IColumnIterator ci : iterators)
- {
- try
- {
- ci.close();
- }
- catch (Throwable th)
- {
- logger.error("error closing " + ci, th);
- }
- }
SSTableReader.releaseReferences(currentView.sstables);
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java?rev=1159942&r1=1159941&r2=1159942&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java Sun Aug
21 04:50:55 2011
@@ -27,6 +27,8 @@ import java.util.concurrent.atomic.Atomi
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSortedSet;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,6 +36,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.cache.AutoSavingCache;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.utils.Pair;
@@ -66,7 +69,7 @@ public class DataTracker
return view.get().memtablesPendingFlush;
}
- public Set<SSTableReader> getSSTables()
+ public List<SSTableReader> getSSTables()
{
return view.get().sstables;
}
@@ -242,7 +245,7 @@ public class DataTracker
{
view.set(new View(new Memtable(cfstore),
Collections.<Memtable>emptySet(),
- Collections.<SSTableReader>emptySet(),
+ Collections.<SSTableReader>emptyList(),
Collections.<SSTableReader>emptySet()));
}
@@ -461,10 +464,15 @@ public class DataTracker
{
public final Memtable memtable;
public final Set<Memtable> memtablesPendingFlush;
- public final Set<SSTableReader> sstables;
public final Set<SSTableReader> compacting;
+ // We can't use a SortedSet here because "the ordering maintained by a
sorted set (whether or not an
+ // explicit comparator is provided) must be <i>consistent with
equals</i>." In particular,
+ // ImmutableSortedSet will ignore any objects that compare equally
with an existing Set member.
+ // Obviously, dropping sstables whose max column timestamp happens to
be equal to another's
+ // is not acceptable for us. So, we use a List instead.
+ public final List<SSTableReader> sstables;
- View(Memtable memtable, Set<Memtable> pendingFlush, Set<SSTableReader>
sstables, Set<SSTableReader> compacting)
+ View(Memtable memtable, Set<Memtable> pendingFlush,
List<SSTableReader> sstables, Set<SSTableReader> compacting)
{
this.memtable = memtable;
this.memtablesPendingFlush = pendingFlush;
@@ -486,15 +494,14 @@ public class DataTracker
public View replaceFlushed(Memtable flushedMemtable, SSTableReader
newSSTable)
{
Set<Memtable> newPending =
ImmutableSet.copyOf(Sets.difference(memtablesPendingFlush,
Collections.singleton(flushedMemtable)));
- Set<SSTableReader> newSSTables =
ImmutableSet.<SSTableReader>builder().addAll(sstables).add(newSSTable).build();
- return new View(memtable, newPending, newSSTables, compacting);
+ List<SSTableReader> newSSTables = newSSTables(newSSTable);
+ return new View(memtable, newPending,
Collections.unmodifiableList(newSSTables), compacting);
}
public View replace(Collection<SSTableReader> oldSSTables,
Iterable<SSTableReader> replacements)
{
- Sets.SetView<SSTableReader> remaining = Sets.difference(sstables,
ImmutableSet.copyOf(oldSSTables));
- Set<SSTableReader> newSSTables =
ImmutableSet.<SSTableReader>builder().addAll(remaining).addAll(replacements).build();
- return new View(memtable, memtablesPendingFlush, newSSTables,
compacting);
+ List<SSTableReader> newSSTables = newSSTables(oldSSTables,
replacements);
+ return new View(memtable, memtablesPendingFlush,
Collections.unmodifiableList(newSSTables), compacting);
}
public View markCompacting(Collection<SSTableReader> tomark)
@@ -508,5 +515,27 @@ public class DataTracker
Set<SSTableReader> compactingNew =
ImmutableSet.copyOf(Sets.difference(compacting, ImmutableSet.copyOf(tounmark)));
return new View(memtable, memtablesPendingFlush, sstables,
compactingNew);
}
+
+ private List<SSTableReader> newSSTables(SSTableReader newSSTable)
+ {
+ // not performance-sensitive, don't obsess over doing a selection
merge here
+ return newSSTables(Collections.<SSTableReader>emptyList(),
Collections.singletonList(newSSTable));
+ }
+
+ private List<SSTableReader> newSSTables(Collection<SSTableReader>
oldSSTables, Iterable<SSTableReader> replacements)
+ {
+ ImmutableSet<SSTableReader> oldSet =
ImmutableSet.copyOf(oldSSTables);
+ int newSSTablesSize = sstables.size() - oldSSTables.size() +
Iterables.size(replacements);
+ List<SSTableReader> newSSTables = new
ArrayList<SSTableReader>(newSSTablesSize);
+ for (SSTableReader sstable : sstables)
+ {
+ if (!oldSet.contains(sstable))
+ newSSTables.add(sstable);
+ }
+ Iterables.addAll(newSSTables, replacements);
+ assert newSSTables.size() == newSSTablesSize;
+ Collections.sort(newSSTables, SSTable.maxTimestampComparator);
+ return newSSTables;
+ }
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java?rev=1159942&r1=1159941&r2=1159942&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java Sun Aug 21
04:50:55 2011
@@ -73,7 +73,13 @@ public interface IColumn
/**
* For a standard column, this is the same as timestamp().
- * For a super column, this is max the column value timestamp of the sub
columns.
+ * For a super column, this is the max column timestamp of the sub columns.
*/
public long maxTimestamp();
+
+ /**
+ * For a standard column, this is the same as timestamp().
+ * For a super column, this is the min column timestamp of the sub columns.
+ */
+ public long minTimestamp();
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java?rev=1159942&r1=1159941&r2=1159942&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java Sun Aug
21 04:50:55 2011
@@ -132,11 +132,18 @@ public class SuperColumn extends Abstrac
public long maxTimestamp()
{
- long maxTimestamp = Long.MIN_VALUE;
+ long maxTimestamp = getMarkedForDeleteAt();
for (IColumn subColumn : getSubColumns())
maxTimestamp = Math.max(maxTimestamp, subColumn.maxTimestamp());
+ return maxTimestamp;
+ }
- return Math.max(maxTimestamp, getMarkedForDeleteAt());
+ public long minTimestamp()
+ {
+ long minTimestamp = getMarkedForDeleteAt();
+ for (IColumn subColumn : getSubColumns())
+ minTimestamp = Math.min(minTimestamp, subColumn.maxTimestamp());
+ return minTimestamp;
}
public long mostRecentLiveChangeAt()
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java?rev=1159942&r1=1159941&r2=1159942&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java
Sun Aug 21 04:50:55 2011
@@ -104,7 +104,7 @@ public class SSTableSliceIterator implem
public boolean hasNext()
{
- return reader.hasNext();
+ return reader != null && reader.hasNext();
}
public IColumn next()
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java?rev=1159942&r1=1159941&r2=1159942&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java Sun
Aug 21 04:50:55 2011
@@ -22,9 +22,7 @@ package org.apache.cassandra.io.sstable;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,6 +58,16 @@ public abstract class SSTable
public static final String TEMPFILE_MARKER = "tmp";
+ public static final Comparator<SSTableReader> maxTimestampComparator = new
Comparator<SSTableReader>()
+ {
+ public int compare(SSTableReader o1, SSTableReader o2)
+ {
+ long ts1 = o1.getMaxTimestamp();
+ long ts2 = o2.getMaxTimestamp();
+ return (ts1 > ts2 ? -1 : (ts1 == ts2 ? 0 : 1));
+ }
+ };
+
public final Descriptor descriptor;
protected final Set<Component> components;
public final CFMetaData metadata;
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=1159942&r1=1159941&r2=1159942&view=diff
==============================================================================
---
cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
(original)
+++
cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
Sun Aug 21 04:50:55 2011
@@ -53,6 +53,7 @@ import static org.apache.cassandra.Util.
import static org.apache.cassandra.Util.getBytes;
import static org.junit.Assert.assertNull;
+import org.junit.Assert;
import org.junit.Test;
public class ColumnFamilyStoreTest extends CleanupHelper
@@ -69,21 +70,48 @@ public class ColumnFamilyStoreTest exten
}
@Test
+ // create two sstables, and verify that we only deserialize data from the
most recent one
+ public void testTimeSortedQuery() throws IOException, ExecutionException,
InterruptedException
+ {
+ Table table = Table.open("Keyspace1");
+ ColumnFamilyStore cfs = table.getColumnFamilyStore("Standard1");
+ cfs.truncate().get();
+
+ RowMutation rm;
+ rm = new RowMutation("Keyspace1", ByteBufferUtil.bytes("key1"));
+ rm.add(new QueryPath("Standard1", null,
ByteBufferUtil.bytes("Column1")), ByteBufferUtil.bytes("asdf"), 0);
+ rm.apply();
+ cfs.forceBlockingFlush();
+
+ rm = new RowMutation("Keyspace1", ByteBufferUtil.bytes("key1"));
+ rm.add(new QueryPath("Standard1", null,
ByteBufferUtil.bytes("Column1")), ByteBufferUtil.bytes("asdf"), 1);
+ rm.apply();
+ cfs.forceBlockingFlush();
+
+ cfs.getRecentSSTablesPerReadHistogram(); // resets counts
+ cfs.getColumnFamily(QueryFilter.getNamesFilter(Util.dk("key1"), new
QueryPath("Standard1", null), ByteBufferUtil.bytes("Column1")));
+ assertEquals(1, cfs.getRecentSSTablesPerReadHistogram()[0]);
+ }
+
+ @Test
public void testGetColumnWithWrongBF() throws IOException,
ExecutionException, InterruptedException
{
+ Table table = Table.open("Keyspace1");
+ ColumnFamilyStore cfs = table.getColumnFamilyStore("Standard1");
+ cfs.truncate().get();
+
List<IMutation> rms = new LinkedList<IMutation>();
RowMutation rm;
rm = new RowMutation("Keyspace1", ByteBufferUtil.bytes("key1"));
rm.add(new QueryPath("Standard1", null,
ByteBufferUtil.bytes("Column1")), ByteBufferUtil.bytes("asdf"), 0);
rm.add(new QueryPath("Standard1", null,
ByteBufferUtil.bytes("Column2")), ByteBufferUtil.bytes("asdf"), 0);
rms.add(rm);
- ColumnFamilyStore store = Util.writeColumnFamily(rms);
+ Util.writeColumnFamily(rms);
- Table table = Table.open("Keyspace1");
List<SSTableReader> ssTables = table.getAllSSTables();
assertEquals(1, ssTables.size());
ssTables.get(0).forceFilterFailures();
- ColumnFamily cf =
store.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("key2"), new
QueryPath("Standard1", null, ByteBufferUtil.bytes("Column1"))));
+ ColumnFamily cf =
cfs.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("key2"), new
QueryPath("Standard1", null, ByteBufferUtil.bytes("Column1"))));
assertNull(cf);
}
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java?rev=1159942&r1=1159941&r2=1159942&view=diff
==============================================================================
---
cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
(original)
+++
cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
Sun Aug 21 04:50:55 2011
@@ -185,6 +185,7 @@ public class CompactionsTest extends Cle
store.forceBlockingFlush();
}
Collection<SSTableReader> toCompact = store.getSSTables();
+ assert toCompact.size() == 2;
// Reinserting the same keys. We will compact only the previous
sstable, but we need those new ones
// to make sure we use EchoedRow, otherwise it won't be used because
purge can be done.
@@ -200,12 +201,15 @@ public class CompactionsTest extends Cle
for (SSTableReader sstable : store.getSSTables())
if (!toCompact.contains(sstable))
tmpSSTable = sstable;
+ assert tmpSSTable != null;
// Force compaction on first sstables. Since each row is in only one
sstable, we will be using EchoedRow.
Util.compact(store, toCompact, false);
+ assertEquals(2, store.getSSTables().size());
// Now, we remove the sstable that was just created to force the use
of EchoedRow (so that it doesn't hide the problem)
store.markCompacted(Collections.singleton(tmpSSTable));
+ assertEquals(1, store.getSSTables().size());
// Now assert we do have the 4 keys
assertEquals(4, Util.getRangeSlice(store).size());