Author: jbellis
Date: Thu May 5 17:46:23 2011
New Revision: 1099892
URL: http://svn.apache.org/viewvc?rev=1099892&view=rev
Log:
improve ignoring of obsoletemutations in index maintenance
patch by jbellis; reviewed by slebresne for CASSANDRA-2401
Modified:
cassandra/branches/cassandra-0.7/CHANGES.txt
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1099892&r1=1099891&r2=1099892&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Thu May 5 17:46:23 2011
@@ -11,6 +11,7 @@
* faster flushes and compaction from fixing excessively pessimistic
rebuffering in BRAF (CASSANDRA-2581)
* include indexes in snapshots (CASSANDRA-2596)
+ * improve ignoring of obsolete mutations in index maintenance (CASSANDRA-2401)
0.7.5
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1099892&r1=1099891&r2=1099892&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Thu May 5 17:46:23 2011
@@ -1566,8 +1566,8 @@ public class ColumnFamilyStore implement
* But, if the calling StorageProxy is doing a good job estimating
data from each range, the range
* should be pretty close to `start_key`. */
if (logger.isDebugEnabled())
- logger.debug(String.format("Scanning index row %s:%s starting
with %s",
- indexCFS.columnFamily, indexKey,
indexCFS.getComparator().getString(startKey)));
+ logger.debug(String.format("Scanning index %s starting with
%s",
+ expressionString(primary),
indexCFS.getComparator().getString(startKey)));
QueryFilter indexFilter = QueryFilter.getSliceFilter(indexKey,
new
QueryPath(indexCFS.getColumnFamilyName()),
startKey,
@@ -1596,6 +1596,8 @@ public class ColumnFamilyStore implement
// get the row columns requested, and additional columns for
the expressions if necessary
ColumnFamily data = getColumnFamily(new QueryFilter(dk, path,
firstFilter));
+ assert data != null : String.format("No data found for %s in
%s:%s (original filter %s) from expression %s",
+ firstFilter, dk, path,
dataFilter, expressionString(primary));
logger.debug("fetched data row {}", data);
if (extraFilter != null)
{
@@ -1639,6 +1641,15 @@ public class ColumnFamilyStore implement
return rows;
}
+ private String expressionString(IndexExpression expr)
+ {
+ return String.format("'%s.%s %s %s'",
+ columnFamily,
+ getComparator().getString(expr.column_name),
+ expr.op,
+
metadata.getColumn_metadata().get(expr.column_name).getValidator().getString(expr.value));
+ }
+
private IndexExpression highestSelectivityPredicate(IndexClause clause)
{
IndexExpression best = null;
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java?rev=1099892&r1=1099891&r2=1099892&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java
Thu May 5 17:46:23 2011
@@ -423,23 +423,26 @@ public class Table
if (oldIndexedColumns == null)
return;
- ColumnFamily cf2 = cf.cloneMe();
- for (IColumn oldColumn : oldIndexedColumns)
+ for (Iterator<ByteBuffer> iter = mutatedIndexedColumns.iterator();
iter.hasNext(); )
{
- cf2.addColumn(oldColumn);
- }
- ColumnFamily resolved = ColumnFamilyStore.removeDeleted(cf2,
Integer.MAX_VALUE);
-
- for (IColumn oldColumn : oldIndexedColumns)
- {
- IColumn resolvedColumn = resolved == null ? null :
resolved.getColumn(oldColumn.name());
- if (resolvedColumn != null && resolvedColumn.equals(oldColumn))
+ ByteBuffer name = iter.next();
+ IColumn newColumn = cf.getColumn(name); // null == row delete or
it wouldn't be marked Mutated
+ if (newColumn != null && cf.isMarkedForDelete())
+ throw new UnsupportedOperationException("Index manager cannot
support deleting and inserting into a row in the same mutation");
+ IColumn oldColumn = oldIndexedColumns.getColumn(name);
+
+ // deletions are irrelevant to the index unless we're changing
state from live -> deleted, i.e.,
+ // just updating w/ a newer tombstone doesn't matter
+ boolean bothDeleted = (newColumn == null ||
newColumn.isMarkedForDelete())
+ && (oldColumn == null ||
oldColumn.isMarkedForDelete());
+ // obsolete means either the row or the column timestamp we're
applying is older than existing data
+ boolean obsoleteRowTombstone = newColumn == null && oldColumn !=
null && cf.getMarkedForDeleteAt() < oldColumn.timestamp();
+ boolean obsoleteColumn = newColumn != null &&
(newColumn.timestamp() <= oldIndexedColumns.getMarkedForDeleteAt()
+ || (oldColumn !=
null && oldColumn.reconcile(newColumn) == oldColumn));
+ if (bothDeleted || obsoleteRowTombstone || obsoleteColumn)
{
- if (logger.isDebugEnabled())
- logger.debug("ignoring obsolete mutation of " +
cf.getComparator().getString(oldColumn.name()));
- cf.remove(oldColumn.name());
- mutatedIndexedColumns.remove(oldColumn.name());
- oldIndexedColumns.remove(oldColumn.name());
+ iter.remove();
+ oldIndexedColumns.remove(name);
}
}
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java?rev=1099892&r1=1099891&r2=1099892&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
Thu May 5 17:46:23 2011
@@ -26,6 +26,8 @@ import java.util.Comparator;
import java.util.Iterator;
import java.util.SortedSet;
+import org.apache.commons.lang.StringUtils;
+
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.columniterator.IColumnIterator;
import org.apache.cassandra.db.columniterator.SSTableNamesIterator;
@@ -89,4 +91,12 @@ public class NamesQueryFilter implements
{
return comparator.columnComparator;
}
+
+ @Override
+ public String toString()
+ {
+ return "NamesQueryFilter(" +
+ "columns=" + StringUtils.join(columns, ",") +
+ ')';
+ }
}
Modified:
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=1099892&r1=1099891&r2=1099892&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
(original)
+++
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
Thu May 5 17:46:23 2011
@@ -258,7 +258,7 @@ public class ColumnFamilyStoreTest exten
assert rows.size() == 1 : StringUtils.join(rows, ",");
String key = new
String(rows.get(0).key.key.array(),rows.get(0).key.key.position(),rows.get(0).key.key.remaining());
assert "k1".equals( key );
-
+
// delete the column directly
rm = new RowMutation("Keyspace3", ByteBufferUtil.bytes("k1"));
rm.delete(new QueryPath("Indexed1", null,
ByteBufferUtil.bytes("birthdate")), 1);
@@ -280,15 +280,40 @@ public class ColumnFamilyStoreTest exten
rm.apply();
rows = cfs.scan(clause, range, filter);
assert rows.size() == 1 : StringUtils.join(rows, ",");
- key = new
String(rows.get(0).key.key.array(),rows.get(0).key.key.position(),rows.get(0).key.key.remaining());
- assert "k1".equals( key );
+ key = new
String(rows.get(0).key.key.array(),rows.get(0).key.key.position(),rows.get(0).key.key.remaining());
+ assert "k1".equals( key );
+
+ // verify that row and delete w/ older timestamp does nothing
+ rm = new RowMutation("Keyspace3", ByteBufferUtil.bytes("k1"));
+ rm.delete(new QueryPath("Indexed1"), 1);
+ rm.apply();
+ rows = cfs.scan(clause, range, filter);
+ assert rows.size() == 1 : StringUtils.join(rows, ",");
+ key = new
String(rows.get(0).key.key.array(),rows.get(0).key.key.position(),rows.get(0).key.key.remaining());
+ assert "k1".equals( key );
+
+ // similarly, column delete w/ older timestamp should do nothing
+ rm = new RowMutation("Keyspace3", ByteBufferUtil.bytes("k1"));
+ rm.delete(new QueryPath("Indexed1", null,
ByteBufferUtil.bytes("birthdate")), 1);
+ rm.apply();
+ rows = cfs.scan(clause, range, filter);
+ assert rows.size() == 1 : StringUtils.join(rows, ",");
+ key = new
String(rows.get(0).key.key.array(),rows.get(0).key.key.position(),rows.get(0).key.key.remaining());
+ assert "k1".equals( key );
- // delete the entire row
+ // delete the entire row (w/ newer timestamp this time)
rm = new RowMutation("Keyspace3", ByteBufferUtil.bytes("k1"));
rm.delete(new QueryPath("Indexed1"), 3);
rm.apply();
rows = cfs.scan(clause, range, filter);
assert rows.isEmpty() : StringUtils.join(rows, ",");
+
+ // make sure obsolete mutations don't generate an index entry
+ rm = new RowMutation("Keyspace3", ByteBufferUtil.bytes("k1"));
+ rm.add(new QueryPath("Indexed1", null,
ByteBufferUtil.bytes("birthdate")), ByteBufferUtil.bytes(1L), 3);
+ rm.apply();
+ rows = cfs.scan(clause, range, filter);
+ assert rows.isEmpty() : StringUtils.join(rows, ",");
}
@Test