Author: jbellis
Date: Sat Aug 21 17:30:30 2010
New Revision: 987800
URL: http://svn.apache.org/viewvc?rev=987800&view=rev
Log:
prevent writes of obsolete data from updating 2ary indexes. patch by jbellis
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=987800&r1=987799&r2=987800&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Sat Aug 21
17:30:30 2010
@@ -334,20 +334,19 @@ public class Table
CommitLog.instance().add(mutation, serializedMutation);
DecoratedKey key =
StorageService.getPartitioner().decorateKey(mutation.key());
- for (ColumnFamily columnFamily : mutation.getColumnFamilies())
+ for (ColumnFamily cf : mutation.getColumnFamilies())
{
- ColumnFamilyStore cfs =
columnFamilyStores.get(columnFamily.id());
+ ColumnFamilyStore cfs = columnFamilyStores.get(cf.id());
if (cfs == null)
{
- logger.error("Attempting to mutate non-existant column
family " + columnFamily.id());
+ logger.error("Attempting to mutate non-existant column
family " + cf.id());
continue;
}
- ColumnFamily oldIndexedColumns;
SortedSet<byte[]> mutatedIndexedColumns = null;
for (byte[] column : cfs.getIndexedColumns())
{
- if (columnFamily.getColumnNames().contains(column))
+ if (cf.getColumnNames().contains(column))
{
if (mutatedIndexedColumns == null)
mutatedIndexedColumns = new
TreeSet<byte[]>(FBUtilities.byteArrayComparator);
@@ -358,7 +357,7 @@ public class Table
if (mutatedIndexedColumns == null)
{
// just update the actual value, no extra synchronization
- applyCF(cfs, key, columnFamily, memtablesToFlush);
+ applyCF(cfs, key, cf, memtablesToFlush);
}
else
{
@@ -366,19 +365,33 @@ public class Table
{
// read old indexed values
QueryFilter filter = QueryFilter.getNamesFilter(key,
new QueryPath(cfs.getColumnFamilyName()), mutatedIndexedColumns);
- oldIndexedColumns = cfs.getColumnFamily(filter);
+ ColumnFamily oldIndexedColumns =
cfs.getColumnFamily(filter);
+
+ // ignore obsolete column updates
+ if (oldIndexedColumns != null)
+ {
+ for (IColumn oldColumn : oldIndexedColumns)
+ {
+ if (cfs.metadata.reconciler.reconcile((Column)
oldColumn, (Column) cf.getColumn(oldColumn.name())).equals(oldColumn))
+ {
+ cf.remove(oldColumn.name());
+
mutatedIndexedColumns.remove(oldColumn.name());
+ oldIndexedColumns.remove(oldColumn.name());
+ }
+ }
+ }
// apply the mutation
- applyCF(cfs, key, columnFamily, memtablesToFlush);
+ applyCF(cfs, key, cf, memtablesToFlush);
// add new index entries
for (byte[] columnName : mutatedIndexedColumns)
{
- IColumn column =
columnFamily.getColumn(columnName);
+ IColumn column = cf.getColumn(columnName);
DecoratedKey<LocalToken> valueKey =
cfs.getIndexKeyFor(columnName, column.value());
- ColumnFamily cf =
cfs.newIndexedColumnFamily(columnName);
- cf.addColumn(new Column(mutation.key(),
ArrayUtils.EMPTY_BYTE_ARRAY, column.clock()));
-
applyCF(cfs.getIndexedColumnFamilyStore(columnName), valueKey, cf,
memtablesToFlush);
+ ColumnFamily cfi =
cfs.newIndexedColumnFamily(columnName);
+ cfi.addColumn(new Column(mutation.key(),
ArrayUtils.EMPTY_BYTE_ARRAY, column.clock()));
+
applyCF(cfs.getIndexedColumnFamilyStore(columnName), valueKey, cfi,
memtablesToFlush);
}
// remove the old index entries
@@ -390,9 +403,9 @@ public class Table
byte[] columnName = entry.getKey();
IColumn column = entry.getValue();
DecoratedKey<LocalToken> valueKey =
cfs.getIndexKeyFor(columnName, column.value());
- ColumnFamily cf =
cfs.newIndexedColumnFamily(columnName);
- cf.deleteColumn(mutation.key(),
localDeletionTime, column.clock());
-
applyCF(cfs.getIndexedColumnFamilyStore(columnName), valueKey, cf,
memtablesToFlush);
+ ColumnFamily cfi =
cfs.newIndexedColumnFamily(columnName);
+ cfi.deleteColumn(mutation.key(),
localDeletionTime, column.clock());
+
applyCF(cfs.getIndexedColumnFamilyStore(columnName), valueKey, cfi,
memtablesToFlush);
}
}
}
@@ -400,7 +413,7 @@ public class Table
ColumnFamily cachedRow = cfs.getRawCachedRow(key);
if (cachedRow != null)
- cachedRow.addAll(columnFamily);
+ cachedRow.addAll(cf);
}
}
finally
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=987800&r1=987799&r2=987800&view=diff
==============================================================================
---
cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
(original)
+++
cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
Sat Aug 21 17:30:30 2010
@@ -220,12 +220,13 @@ public class ColumnFamilyStoreTest exten
@Test
public void testIndexUpdate() throws IOException
{
- RowMutation rm;
+ Table table = Table.open("Keyspace2");
+ // create a row and update the birthdate value, test that the index
query fetches the new version
+ RowMutation rm;
rm = new RowMutation("Keyspace2", "k1".getBytes());
rm.add(new QueryPath("Indexed1", null, "birthdate".getBytes("UTF8")),
FBUtilities.toByteArray(1L), new TimestampClock(1));
rm.apply();
-
rm = new RowMutation("Keyspace2", "k1".getBytes());
rm.add(new QueryPath("Indexed1", null, "birthdate".getBytes("UTF8")),
FBUtilities.toByteArray(2L), new TimestampClock(2));
rm.apply();
@@ -235,12 +236,20 @@ public class ColumnFamilyStoreTest exten
IFilter filter = new IdentityQueryFilter();
IPartitioner p = StorageService.getPartitioner();
Range range = new Range(p.getMinimumToken(), p.getMinimumToken());
- List<Row> rows =
Table.open("Keyspace2").getColumnFamilyStore("Indexed1").scan(clause, range,
filter);
+ List<Row> rows = table.getColumnFamilyStore("Indexed1").scan(clause,
range, filter);
assert rows.size() == 0;
expr = new IndexExpression("birthdate".getBytes("UTF8"),
IndexOperator.EQ, FBUtilities.toByteArray(2L));
clause = new IndexClause(Arrays.asList(expr),
ArrayUtils.EMPTY_BYTE_ARRAY, 100);
- rows =
Table.open("Keyspace2").getColumnFamilyStore("Indexed1").scan(clause, range,
filter);
+ rows = table.getColumnFamilyStore("Indexed1").scan(clause, range,
filter);
+ assert Arrays.equals("k1".getBytes(), rows.get(0).key.key);
+
+ // update the birthdate value with an OLDER timestamp, and test that
the index ignores this
+ rm = new RowMutation("Keyspace2", "k1".getBytes());
+ rm.add(new QueryPath("Indexed1", null, "birthdate".getBytes("UTF8")),
FBUtilities.toByteArray(3L), new TimestampClock(0));
+ rm.apply();
+
+ rows = table.getColumnFamilyStore("Indexed1").scan(clause, range,
filter);
assert Arrays.equals("k1".getBytes(), rows.get(0).key.key);
}