Author: jbellis
Date: Sat Dec 5 00:30:22 2009
New Revision: 887467
URL: http://svn.apache.org/viewvc?rev=887467&view=rev
Log:
move resolve, diff out of Row and into CF static methods
patch by jbellis; reviewed by Stu Hood for CASSANDRA-568
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/ReadResponseResolverTest.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java?rev=887467&r1=887466&r2=887467&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
Sat Dec 5 00:30:22 2009
@@ -27,6 +27,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ConcurrentSkipListMap;
import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
import org.apache.log4j.Logger;
@@ -346,6 +347,23 @@
return sb.toString();
}
+ public static byte[] digest(ColumnFamily cf)
+ {
+ MessageDigest digest;
+ try
+ {
+ digest = MessageDigest.getInstance("MD5");
+ }
+ catch (NoSuchAlgorithmException e)
+ {
+ throw new AssertionError(e);
+ }
+ if (cf != null)
+ cf.updateDigest(digest);
+
+ return digest.digest();
+ }
+
public void updateDigest(MessageDigest digest)
{
for (IColumn column : columns_.values())
@@ -397,6 +415,13 @@
: DatabaseDescriptor.getSubComparator(table, columnFamilyName);
}
+ public static ColumnFamily diff(ColumnFamily cf1, ColumnFamily cf2)
+ {
+ if (cf1 == null)
+ return cf2;
+ return cf1.diff(cf2);
+ }
+
public static ColumnFamily resolve(ColumnFamily cf1, ColumnFamily cf2)
{
if (cf1 == null)
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=887467&r1=887466&r2=887467&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
Sat Dec 5 00:30:22 2009
@@ -81,8 +81,8 @@
if (command.isDigestQuery())
{
if (logger_.isDebugEnabled())
- logger_.debug("digest is " +
FBUtilities.bytesToHex(row.digest()));
- readResponse = new ReadResponse(row.digest());
+ logger_.debug("digest is " +
FBUtilities.bytesToHex(ColumnFamily.digest(row.cf)));
+ readResponse = new ReadResponse(ColumnFamily.digest(row.cf));
}
else
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java?rev=887467&r1=887466&r2=887467&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java Sat Dec
5 00:30:22 2009
@@ -50,43 +50,6 @@
this.cf = cf;
}
- /**
- * This function will repair the current row with the input row
- * what that means is that if there are any differences between the 2 rows
then
- * this function will make the current row take the latest changes.
- */
- public Row resolve(Row other)
- {
- if (cf == null)
- return other;
- cf.resolve(other.cf);
- return this;
- }
-
- public ColumnFamily diff (Row other)
- {
- if (cf == null)
- return other.cf;
- return cf.diff(other.cf);
- }
-
- public byte[] digest()
- {
- MessageDigest digest;
- try
- {
- digest = MessageDigest.getInstance("MD5");
- }
- catch (NoSuchAlgorithmException e)
- {
- throw new AssertionError(e);
- }
- if (cf != null)
- cf.updateDigest(digest);
-
- return digest.digest();
- }
-
@Override
public String toString()
{
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java?rev=887467&r1=887466&r2=887467&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
Sat Dec 5 00:30:22 2009
@@ -26,6 +26,7 @@
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.io.DataInputBuffer;
import java.net.InetAddress;
import org.apache.cassandra.net.IAsyncCallback;
@@ -64,7 +65,7 @@
bufIn.reset(body, body.length);
ReadResponse result =
ReadResponse.serializer().deserialize(bufIn);
byte[] digest = result.digest();
- if (!Arrays.equals(row_.digest(), digest))
+ if (!Arrays.equals(ColumnFamily.digest(row_.cf), digest))
{
doReadRepair();
break;
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=887467&r1=887466&r2=887467&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
Sat Dec 5 00:30:22 2009
@@ -62,7 +62,7 @@
public Row resolve(List<Message> responses) throws
DigestMismatchException, IOException
{
long startTime = System.currentTimeMillis();
- List<Row> rows = new ArrayList<Row>();
+ List<ColumnFamily> versions = new ArrayList<ColumnFamily>();
List<InetAddress> endPoints = new ArrayList<InetAddress>();
String key = null;
byte[] digest = new byte[0];
@@ -87,7 +87,7 @@
}
else
{
- rows.add(result.row());
+ versions.add(result.row().cf);
endPoints.add(response.getFrom());
key = result.row().key;
}
@@ -96,55 +96,62 @@
// If there is a mismatch then throw an exception so that read
repair can happen.
if (isDigestQuery)
{
- for (Row row : rows)
+ for (ColumnFamily cf : versions)
{
- if (!Arrays.equals(row.digest(), digest))
+ if (!Arrays.equals(ColumnFamily.digest(cf), digest))
{
/* Wrap the key as the context in this exception */
- String s = String.format("Mismatch for key %s (%s vs %s)",
row.key, FBUtilities.bytesToHex(row.digest()), FBUtilities.bytesToHex(digest));
+ String s = String.format("Mismatch for key %s (%s vs %s)",
key, FBUtilities.bytesToHex(ColumnFamily.digest(cf)),
FBUtilities.bytesToHex(digest));
throw new DigestMismatchException(s);
}
}
}
- Row resolved = resolveSuperset(rows);
+ ColumnFamily resolved = resolveSuperset(versions);
+ maybeScheduleRepairs(resolved, table, key, versions, endPoints);
- // At this point we have the return row;
- // Now we need to calculate the difference so that we can schedule
read repairs
- for (int i = 0; i < rows.size(); i++)
+ if (logger_.isDebugEnabled())
+ logger_.debug("resolve: " + (System.currentTimeMillis() -
startTime) + " ms.");
+ return new Row(key, resolved);
+ }
+
+ /**
+ * For each row version, compare with resolved (the superset of all row
versions);
+ * if it is missing anything, send a mutation to the endpoint it come from.
+ */
+ public static void maybeScheduleRepairs(ColumnFamily resolved, String
table, String key, List<ColumnFamily> versions, List<InetAddress> endPoints)
+ {
+ for (int i = 0; i < versions.size(); i++)
{
- // since retRow is the resolved row it can be used as the super set
- ColumnFamily diffCf = rows.get(i).diff(resolved);
+ ColumnFamily diffCf = ColumnFamily.diff(versions.get(i), resolved);
if (diffCf == null) // no repair needs to happen
continue;
- // create the row mutation message based on the diff and schedule
a read repair
+
+ // create and send the row mutation message based on the diff
RowMutation rowMutation = new RowMutation(table, key);
rowMutation.add(diffCf);
RowMutationMessage rowMutationMessage = new
RowMutationMessage(rowMutation);
ReadRepairManager.instance().schedule(endPoints.get(i),
rowMutationMessage);
}
- if (logger_.isDebugEnabled())
- logger_.debug("resolve: " + (System.currentTimeMillis() -
startTime) + " ms.");
- return resolved;
- }
+ }
- static Row resolveSuperset(List<Row> rows)
+ static ColumnFamily resolveSuperset(List<ColumnFamily> versions)
{
- assert rows.size() > 0;
- Row resolved = null;
- for (Row row : rows)
+ assert versions.size() > 0;
+ ColumnFamily resolved = null;
+ for (ColumnFamily cf : versions)
{
- if (row.cf != null)
+ if (cf != null)
{
- resolved = new Row(row.key, row.cf.cloneMe());
+ resolved = cf.cloneMe();
break;
}
}
if (resolved == null)
- return rows.get(0);
- for (Row row : rows)
+ return null;
+ for (ColumnFamily cf : versions)
{
- resolved = resolved.resolve(row);
+ resolved.resolve(cf);
}
return resolved;
}
Modified:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java?rev=887467&r1=887466&r2=887467&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java
(original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java
Sat Dec 5 00:30:22 2009
@@ -61,15 +61,12 @@
{
ColumnFamily cf1 = ColumnFamily.create("Keyspace1", "Standard1");
cf1.addColumn(column("one", "A", 0));
- Row row1 = new Row("", cf1);
ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1");
cf2.addColumn(column("one", "B", 1));
cf2.addColumn(column("two", "C", 1));
- Row row2 = new Row("", cf2);
- row1.resolve(row2);
- cf1 = row1.cf;
+ cf1.resolve(cf2);
assert Arrays.equals(cf1.getColumn("one".getBytes()).value(),
"B".getBytes());
assert Arrays.equals(cf1.getColumn("two".getBytes()).value(),
"C".getBytes());
}
Modified:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/ReadResponseResolverTest.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/ReadResponseResolverTest.java?rev=887467&r1=887466&r2=887467&view=diff
==============================================================================
---
incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/ReadResponseResolverTest.java
(original)
+++
incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/ReadResponseResolverTest.java
Sat Dec 5 00:30:22 2009
@@ -17,16 +17,14 @@
{
ColumnFamily cf1 = ColumnFamily.create("Keyspace1", "Standard1");
cf1.addColumn(column("c1", "v1", 0));
- Row row1 = new Row("key1", cf1);
ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1");
cf2.addColumn(column("c1", "v2", 1));
- Row row2 = new Row("key1", cf2);
- Row resolved =
ReadResponseResolver.resolveSuperset(Arrays.asList(row1, row2));
- assertColumns(resolved.cf, "c1");
- assertColumns(row1.diff(resolved), "c1");
- assertNull(row2.diff(resolved));
+ ColumnFamily resolved =
ReadResponseResolver.resolveSuperset(Arrays.asList(cf1, cf2));
+ assertColumns(resolved, "c1");
+ assertColumns(ColumnFamily.diff(cf1, resolved), "c1");
+ assertNull(ColumnFamily.diff(cf2, resolved));
}
@Test
@@ -34,31 +32,26 @@
{
ColumnFamily cf1 = ColumnFamily.create("Keyspace1", "Standard1");
cf1.addColumn(column("c1", "v1", 0));
- Row row1 = new Row("key1", cf1);
ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1");
cf2.addColumn(column("c2", "v2", 1));
- Row row2 = new Row("key1", cf2);
- Row resolved =
ReadResponseResolver.resolveSuperset(Arrays.asList(row1, row2));
- assertColumns(resolved.cf, "c1", "c2");
- assertColumns(row1.diff(resolved), "c2");
- assertColumns(row2.diff(resolved), "c1");
+ ColumnFamily resolved =
ReadResponseResolver.resolveSuperset(Arrays.asList(cf1, cf2));
+ assertColumns(resolved, "c1", "c2");
+ assertColumns(ColumnFamily.diff(cf1, resolved), "c2");
+ assertColumns(ColumnFamily.diff(cf2, resolved), "c1");
}
@Test
public void testResolveSupersetNullOne()
{
- Row row1 = new Row("key1", null);
-
ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1");
cf2.addColumn(column("c2", "v2", 1));
- Row row2 = new Row("key1", cf2);
- Row resolved =
ReadResponseResolver.resolveSuperset(Arrays.asList(row1, row2));
- assertColumns(resolved.cf, "c2");
- assertColumns(row1.diff(resolved), "c2");
- assertNull(row2.diff(resolved));
+ ColumnFamily resolved =
ReadResponseResolver.resolveSuperset(Arrays.asList(null, cf2));
+ assertColumns(resolved, "c2");
+ assertColumns(ColumnFamily.diff(null, resolved), "c2");
+ assertNull(ColumnFamily.diff(cf2, resolved));
}
@Test
@@ -66,25 +59,16 @@
{
ColumnFamily cf1 = ColumnFamily.create("Keyspace1", "Standard1");
cf1.addColumn(column("c1", "v1", 0));
- Row row1 = new Row("key1", cf1);
-
- Row row2 = new Row("key1", null);
- Row resolved =
ReadResponseResolver.resolveSuperset(Arrays.asList(row1, row2));
- assertColumns(resolved.cf, "c1");
- assertNull(row1.diff(resolved));
- assertColumns(row2.diff(resolved), "c1");
+ ColumnFamily resolved =
ReadResponseResolver.resolveSuperset(Arrays.asList(cf1, null));
+ assertColumns(resolved, "c1");
+ assertNull(ColumnFamily.diff(cf1, resolved));
+ assertColumns(ColumnFamily.diff(null, resolved), "c1");
}
@Test
public void testResolveSupersetNullBoth()
{
- Row row1 = new Row("key1", null);
- Row row2 = new Row("key1", null);
-
- Row resolved =
ReadResponseResolver.resolveSuperset(Arrays.asList(row1, row2));
- assertNull(resolved.cf);
- assertNull(row1.diff(resolved));
- assertNull(row2.diff(resolved));
+
assertNull(ReadResponseResolver.resolveSuperset(Arrays.<ColumnFamily>asList(null,
null)));
}
}