Author: jbellis
Date: Fri Apr 24 00:46:32 2009
New Revision: 768109
URL: http://svn.apache.org/viewvc?rev=768109&view=rev
Log:
use resolve (moved to CF from CFS) rather than duplicating code in repair to do
the
same thing. this automatically gets us most of the way to handling tombstones
in
read repair. the rest is minor tweaks to CF.diff and SC.diff. added tests.
patch by jbellis; reviewed by Jun Rao for #87.
Added:
incubator/cassandra/trunk/test/org/apache/cassandra/db/RowTest.java
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamily.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/Row.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/SuperColumn.java
incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamily.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamily.java?rev=768109&r1=768108&r2=768109&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamily.java
(original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamily.java Fri
Apr 24 00:46:32 2009
@@ -312,32 +312,23 @@
}
/*
- * This function will repair a list of columns
- * If there are any columns in the external list which are not present
- * internally then they are added ( this might have to change depending on
- * how we implement delete repairs).
- * Also if there are any columns in teh internal and not in the external
- * they are kept intact.
- * Else the one with the greatest timestamp is considered latest.
- */
- void repair(ColumnFamily columnFamily)
- {
- for (IColumn column : columnFamily.getAllColumns()) {
- addColumn(column);
- }
- }
-
- /*
* This function will calculate the differnce between 2 column families
* the external input is considered the superset of internal
* so there are no deletes in the diff.
*/
- ColumnFamily diff(ColumnFamily columnFamily)
+ ColumnFamily diff(ColumnFamily cfComposite)
{
- ColumnFamily cfDiff = new ColumnFamily(columnFamily.name(),
columnFamily.type_);
- Map<String, IColumn> columns = columnFamily.getColumns();
- Set<String> cNames = columns.keySet();
+ ColumnFamily cfDiff = new ColumnFamily(cfComposite.name(),
cfComposite.type_);
+ if (cfComposite.getMarkedForDeleteAt() > getMarkedForDeleteAt())
+ {
+ cfDiff.delete(cfComposite.getLocalDeletionTime(),
cfComposite.getMarkedForDeleteAt());
+ }
+ // (don't need to worry about cfNew containing IColumns that are
shadowed by
+ // the delete tombstone, since cfNew was generated by CF.resolve, which
+ // takes care of those for us.)
+ Map<String, IColumn> columns = cfComposite.getColumns();
+ Set<String> cNames = columns.keySet();
for ( String cName : cNames )
{
IColumn columnInternal = columns_.get(cName);
@@ -355,7 +346,8 @@
}
}
}
- if(cfDiff.getColumns().size() != 0)
+
+ if (!cfDiff.getColumns().isEmpty() || cfDiff.isMarkedForDelete())
return cfDiff;
else
return null;
@@ -437,6 +429,28 @@
return type_;
}
+ /** merge all columnFamilies into a single instance, with only the newest
versions of columns preserved. */
+ static ColumnFamily resolve(List<ColumnFamily> columnFamilies)
+ {
+ int size = columnFamilies.size();
+ if (size == 0)
+ return null;
+
+ // start from nothing so that we don't include potential deleted
columns from the first instance
+ ColumnFamily cf0 = columnFamilies.get(0);
+ ColumnFamily cf = cf0.cloneMeShallow();
+
+ // merge
+ for (ColumnFamily cf2 : columnFamilies)
+ {
+ assert cf.name().equals(cf2.name());
+ cf.addColumns(cf2);
+ cf.delete(Math.max(cf.getLocalDeletionTime(),
cf2.getLocalDeletionTime()),
+ Math.max(cf.getMarkedForDeleteAt(),
cf2.getMarkedForDeleteAt()));
+ }
+ return cf;
+ }
+
public static class ColumnFamilySerializer implements
ICompactSerializer2<ColumnFamily>
{
/*
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java?rev=768109&r1=768108&r2=768109&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
Fri Apr 24 00:46:32 2009
@@ -580,39 +580,17 @@
columnFamilies.add(columnFamily);
}
}
-
- /** merge all columnFamilies into a single instance, with only the newest
versions of columns preserved. */
- static ColumnFamily resolve(List<ColumnFamily> columnFamilies)
- {
- int size = columnFamilies.size();
- if (size == 0)
- return null;
-
- // start from nothing so that we don't include potential deleted
columns from the first instance
- ColumnFamily cf0 = columnFamilies.get(0);
- ColumnFamily cf = cf0.cloneMeShallow();
-
- // merge
- for (ColumnFamily cf2 : columnFamilies)
- {
- assert cf.name().equals(cf2.name());
- cf.addColumns(cf2);
- cf.delete(Math.max(cf.getLocalDeletionTime(),
cf2.getLocalDeletionTime()),
- Math.max(cf.getMarkedForDeleteAt(),
cf2.getMarkedForDeleteAt()));
- }
- return cf;
- }
/** like resolve, but leaves the resolved CF as the only item in the list
*/
private static void merge(List<ColumnFamily> columnFamilies)
{
- ColumnFamily cf = resolve(columnFamilies);
+ ColumnFamily cf = ColumnFamily.resolve(columnFamilies);
columnFamilies.clear();
columnFamilies.add(cf);
}
private static ColumnFamily resolveAndRemoveDeleted(List<ColumnFamily>
columnFamilies) {
- ColumnFamily cf = resolve(columnFamilies);
+ ColumnFamily cf = ColumnFamily.resolve(columnFamilies);
return removeDeleted(cf);
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/Row.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/Row.java?rev=768109&r1=768108&r2=768109&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/Row.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/Row.java Fri Apr 24
00:46:32 2009
@@ -26,6 +26,7 @@
import java.util.Hashtable;
import java.util.Map;
import java.util.Set;
+import java.util.Arrays;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
@@ -110,22 +111,21 @@
* what that means is that if there are any differences between the 2 rows
then
* this fn will make the current row take the latest changes .
*/
- public void repair(Row row)
+ public void repair(Row rowOther)
{
- Map<String, ColumnFamily> columnFamilies = row.getColumnFamilyMap();
- Set<String> cfNames = columnFamilies.keySet();
-
- for (String cfName : cfNames)
+ for (ColumnFamily cfOld : rowOther.getColumnFamilies())
{
- ColumnFamily cf = columnFamilies_.get(cfName);
+ ColumnFamily cf = columnFamilies_.get(cfOld.name());
if (cf == null)
{
- cf = new ColumnFamily(cfName, cf.type());
- columnFamilies_.put(cfName, cf);
+ addColumnFamily(cfOld);
+ }
+ else
+ {
+ columnFamilies_.remove(cf.name());
+ addColumnFamily(ColumnFamily.resolve(Arrays.asList(cfOld,
cf)));
}
- cf.repair(columnFamilies.get(cfName));
}
-
}
/*
@@ -136,19 +136,18 @@
* difference and does not take care of what needs to be removed from the
current row to make
* it same as the input row.
*/
- public Row diff(Row rowNew)
+ public Row diff(Row rowComposite)
{
Row rowDiff = new Row(key_);
- for (ColumnFamily cfNew : rowNew.getColumnFamilies())
+ for (ColumnFamily cfComposite : rowComposite.getColumnFamilies())
{
- ColumnFamily cf = columnFamilies_.get(cfNew.name());
- ColumnFamily cfDiff = null;
+ ColumnFamily cf = columnFamilies_.get(cfComposite.name());
if (cf == null)
- rowDiff.addColumnFamily(cfNew);
+ rowDiff.addColumnFamily(cfComposite);
else
{
- cfDiff = cf.diff(cfNew);
+ ColumnFamily cfDiff = cf.diff(cfComposite);
if (cfDiff != null)
rowDiff.addColumnFamily(cfDiff);
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/SuperColumn.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/SuperColumn.java?rev=768109&r1=768108&r2=768109&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/SuperColumn.java
(original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/SuperColumn.java Fri
Apr 24 00:46:32 2009
@@ -230,11 +230,18 @@
return columns_.size();
}
- public IColumn diff(IColumn column)
+ public IColumn diff(IColumn columnNew)
{
- IColumn columnDiff = new SuperColumn(column.name());
+ IColumn columnDiff = new SuperColumn(columnNew.name());
+ if (columnNew.getMarkedForDeleteAt() > getMarkedForDeleteAt())
+ {
+
((SuperColumn)columnDiff).markForDeleteAt(columnNew.getLocalDeletionTime(),
columnNew.getMarkedForDeleteAt());
+ }
- for (IColumn subColumn : column.getSubColumns())
+ // (don't need to worry about columnNew containing subColumns that are
shadowed by
+ // the delete tombstone, since columnNew was generated by CF.resolve,
which
+ // takes care of those for us.)
+ for (IColumn subColumn : columnNew.getSubColumns())
{
IColumn columnInternal = columns_.get(subColumn.name());
if(columnInternal == null )
@@ -251,7 +258,7 @@
}
}
- if (!columnDiff.getSubColumns().isEmpty())
+ if (!columnDiff.getSubColumns().isEmpty() ||
columnNew.isMarkedForDelete())
return columnDiff;
else
return null;
Modified:
incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=768109&r1=768108&r2=768109&view=diff
==============================================================================
---
incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java
(original)
+++
incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java
Fri Apr 24 00:46:32 2009
@@ -257,7 +257,7 @@
List<ColumnFamily> families = store.getColumnFamilies("key1",
"Super1", new IdentityFilter());
assert families.get(0).getAllColumns().first().getMarkedForDeleteAt()
== 1; // delete marker, just added
assert !families.get(1).getAllColumns().first().isMarkedForDelete();
// flushed old version
- ColumnFamily resolved = ColumnFamilyStore.resolve(families);
+ ColumnFamily resolved = ColumnFamily.resolve(families);
assert resolved.getAllColumns().first().getMarkedForDeleteAt() == 1;
Collection<IColumn> subColumns =
resolved.getAllColumns().first().getSubColumns();
assert subColumns.size() == 1;
Added: incubator/cassandra/trunk/test/org/apache/cassandra/db/RowTest.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/org/apache/cassandra/db/RowTest.java?rev=768109&view=auto
==============================================================================
--- incubator/cassandra/trunk/test/org/apache/cassandra/db/RowTest.java (added)
+++ incubator/cassandra/trunk/test/org/apache/cassandra/db/RowTest.java Fri Apr
24 00:46:32 2009
@@ -0,0 +1,61 @@
+package org.apache.cassandra.db;
+
+import java.util.Arrays;
+
+import org.testng.annotations.Test;
+import static org.testng.Assert.assertEquals;
+
+public class RowTest
+{
+ @Test
+ public void testDiffColumnFamily()
+ {
+ ColumnFamily cf1 = new ColumnFamily("Standard1", "Standard");
+ cf1.addColumn("one", "onev".getBytes(), 0);
+
+ ColumnFamily cf2 = new ColumnFamily("Standard1", "Standard");
+ cf2.delete(0, 0);
+
+ ColumnFamily cfDiff = cf1.diff(cf2);
+ assertEquals(cfDiff.getColumns().size(), 0);
+ assertEquals(cfDiff.getMarkedForDeleteAt(), 0);
+ }
+
+ @Test
+ public void testDiffSuperColumn()
+ {
+ SuperColumn sc1 = new SuperColumn("one");
+ sc1.addColumn("subcolumn", new Column("subcolumn", "A".getBytes(), 0));
+
+ SuperColumn sc2 = new SuperColumn("one");
+ sc2.markForDeleteAt(0, 0);
+
+ SuperColumn scDiff = (SuperColumn)sc1.diff(sc2);
+ assertEquals(scDiff.getSubColumns().size(), 0);
+ assertEquals(scDiff.getMarkedForDeleteAt(), 0);
+ }
+
+ @Test
+ public void testRepair()
+ {
+ Row row1 = new Row();
+ ColumnFamily cf1 = new ColumnFamily("Standard1", "Standard");
+ cf1.addColumn("one", "A".getBytes(), 0);
+ row1.addColumnFamily(cf1);
+
+ Row row2 = new Row();
+ ColumnFamily cf2 = new ColumnFamily("Standard1", "Standard");
+ cf2.addColumn("one", "B".getBytes(), 1);
+ cf2.addColumn("two", "C".getBytes(), 1);
+ ColumnFamily cf3 = new ColumnFamily("Standard2", "Standard");
+ cf3.addColumn("three", "D".getBytes(), 1);
+ row2.addColumnFamily(cf2);
+ row2.addColumnFamily(cf3);
+
+ row1.repair(row2);
+ cf1 = row1.getColumnFamily("Standard1");
+ assert Arrays.equals(cf1.getColumn("one").value(), "B".getBytes());
+ assert Arrays.equals(cf2.getColumn("two").value(), "C".getBytes());
+ assert row1.getColumnFamily("Standard2") != null;
+ }
+}