Author: jbellis
Date: Sat Dec  5 00:30:22 2009
New Revision: 887467

URL: http://svn.apache.org/viewvc?rev=887467&view=rev
Log:
move resolve, diff out of Row and into CF static methods
patch by jbellis; reviewed by Stu Hood for CASSANDRA-568

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java
    
incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/ReadResponseResolverTest.java

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java?rev=887467&r1=887466&r2=887467&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java 
(original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java 
Sat Dec  5 00:30:22 2009
@@ -27,6 +27,7 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
 
 import org.apache.log4j.Logger;
 
@@ -346,6 +347,23 @@
        return sb.toString();
     }
 
+    public static byte[] digest(ColumnFamily cf)
+    {
+        MessageDigest digest;
+        try
+        {
+            digest = MessageDigest.getInstance("MD5");
+        }
+        catch (NoSuchAlgorithmException e)
+        {
+            throw new AssertionError(e);
+        }
+        if (cf != null)
+            cf.updateDigest(digest);
+
+        return digest.digest();
+    }
+
     public void updateDigest(MessageDigest digest)
     {
         for (IColumn column : columns_.values())
@@ -397,6 +415,13 @@
                : DatabaseDescriptor.getSubComparator(table, columnFamilyName);
     }
 
+    public static ColumnFamily diff(ColumnFamily cf1, ColumnFamily cf2)
+    {
+        if (cf1 == null)
+            return cf2;
+        return cf1.diff(cf2);
+    }
+
     public static ColumnFamily resolve(ColumnFamily cf1, ColumnFamily cf2)
     {
         if (cf1 == null)

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=887467&r1=887466&r2=887467&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java 
(original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java 
Sat Dec  5 00:30:22 2009
@@ -81,8 +81,8 @@
             if (command.isDigestQuery())
             {
                 if (logger_.isDebugEnabled())
-                    logger_.debug("digest is " + 
FBUtilities.bytesToHex(row.digest()));
-                readResponse = new ReadResponse(row.digest());
+                    logger_.debug("digest is " + 
FBUtilities.bytesToHex(ColumnFamily.digest(row.cf)));
+                readResponse = new ReadResponse(ColumnFamily.digest(row.cf));
             }
             else
             {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java?rev=887467&r1=887466&r2=887467&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java 
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java Sat Dec 
 5 00:30:22 2009
@@ -50,43 +50,6 @@
         this.cf = cf;
     }
 
-    /**
-     * This function will repair the current row with the input row
-     * what that means is that if there are any differences between the 2 rows 
then
-     * this function will make the current row take the latest changes.
-     */
-    public Row resolve(Row other)
-    {
-        if (cf == null)
-            return other;
-        cf.resolve(other.cf);
-        return this;
-    }
-
-    public ColumnFamily diff (Row other)
-    {
-        if (cf == null)
-            return other.cf;
-        return cf.diff(other.cf);
-    }
-
-    public byte[] digest()
-    {
-        MessageDigest digest;
-        try
-        {
-            digest = MessageDigest.getInstance("MD5");
-        }
-        catch (NoSuchAlgorithmException e)
-        {
-            throw new AssertionError(e);
-        }
-        if (cf != null)
-            cf.updateDigest(digest);
-
-        return digest.digest();
-    }
-
     @Override
     public String toString()
     {

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java?rev=887467&r1=887466&r2=887467&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
 Sat Dec  5 00:30:22 2009
@@ -26,6 +26,7 @@
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.ReadResponse;
 import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.io.DataInputBuffer;
 import java.net.InetAddress;
 import org.apache.cassandra.net.IAsyncCallback;
@@ -64,7 +65,7 @@
                     bufIn.reset(body, body.length);
                     ReadResponse result = 
ReadResponse.serializer().deserialize(bufIn);
                     byte[] digest = result.digest();
-                    if (!Arrays.equals(row_.digest(), digest))
+                    if (!Arrays.equals(ColumnFamily.digest(row_.cf), digest))
                     {
                         doReadRepair();
                         break;

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=887467&r1=887466&r2=887467&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
 Sat Dec  5 00:30:22 2009
@@ -62,7 +62,7 @@
        public Row resolve(List<Message> responses) throws 
DigestMismatchException, IOException
     {
         long startTime = System.currentTimeMillis();
-               List<Row> rows = new ArrayList<Row>();
+               List<ColumnFamily> versions = new ArrayList<ColumnFamily>();
                List<InetAddress> endPoints = new ArrayList<InetAddress>();
                String key = null;
                byte[] digest = new byte[0];
@@ -87,7 +87,7 @@
             }
             else
             {
-                rows.add(result.row());
+                versions.add(result.row().cf);
                 endPoints.add(response.getFrom());
                 key = result.row().key;
             }
@@ -96,55 +96,62 @@
                // If there is a mismatch then throw an exception so that read 
repair can happen.
         if (isDigestQuery)
         {
-            for (Row row : rows)
+            for (ColumnFamily cf : versions)
             {
-                if (!Arrays.equals(row.digest(), digest))
+                if (!Arrays.equals(ColumnFamily.digest(cf), digest))
                 {
                     /* Wrap the key as the context in this exception */
-                    String s = String.format("Mismatch for key %s (%s vs %s)", 
row.key, FBUtilities.bytesToHex(row.digest()), FBUtilities.bytesToHex(digest));
+                    String s = String.format("Mismatch for key %s (%s vs %s)", 
key, FBUtilities.bytesToHex(ColumnFamily.digest(cf)), 
FBUtilities.bytesToHex(digest));
                     throw new DigestMismatchException(s);
                 }
             }
         }
 
-        Row resolved = resolveSuperset(rows);
+        ColumnFamily resolved = resolveSuperset(versions);
+        maybeScheduleRepairs(resolved, table, key, versions, endPoints);
 
-        // At this point we have the return row;
-        // Now we need to calculate the difference so that we can schedule 
read repairs
-        for (int i = 0; i < rows.size(); i++)
+        if (logger_.isDebugEnabled())
+            logger_.debug("resolve: " + (System.currentTimeMillis() - 
startTime) + " ms.");
+               return new Row(key, resolved);
+       }
+
+    /**
+     * For each row version, compare with resolved (the superset of all row 
versions);
+     * if it is missing anything, send a mutation to the endpoint it come from.
+     */
+    public static void maybeScheduleRepairs(ColumnFamily resolved, String 
table, String key, List<ColumnFamily> versions, List<InetAddress> endPoints)
+    {
+        for (int i = 0; i < versions.size(); i++)
         {
-            // since retRow is the resolved row it can be used as the super set
-            ColumnFamily diffCf = rows.get(i).diff(resolved);
+            ColumnFamily diffCf = ColumnFamily.diff(versions.get(i), resolved);
             if (diffCf == null) // no repair needs to happen
                 continue;
-            // create the row mutation message based on the diff and schedule 
a read repair
+
+            // create and send the row mutation message based on the diff
             RowMutation rowMutation = new RowMutation(table, key);
             rowMutation.add(diffCf);
             RowMutationMessage rowMutationMessage = new 
RowMutationMessage(rowMutation);
             ReadRepairManager.instance().schedule(endPoints.get(i), 
rowMutationMessage);
         }
-        if (logger_.isDebugEnabled())
-            logger_.debug("resolve: " + (System.currentTimeMillis() - 
startTime) + " ms.");
-               return resolved;
-       }
+    }
 
-    static Row resolveSuperset(List<Row> rows)
+    static ColumnFamily resolveSuperset(List<ColumnFamily> versions)
     {
-        assert rows.size() > 0;
-        Row resolved = null;
-        for (Row row : rows)
+        assert versions.size() > 0;
+        ColumnFamily resolved = null;
+        for (ColumnFamily cf : versions)
         {
-            if (row.cf != null)
+            if (cf != null)
             {
-                resolved = new Row(row.key, row.cf.cloneMe());
+                resolved = cf.cloneMe();
                 break;
             }
         }
         if (resolved == null)
-            return rows.get(0);
-        for (Row row : rows)
+            return null;
+        for (ColumnFamily cf : versions)
         {
-            resolved = resolved.resolve(row);
+            resolved.resolve(cf);
         }
         return resolved;
     }

Modified: 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java?rev=887467&r1=887466&r2=887467&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java 
(original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java 
Sat Dec  5 00:30:22 2009
@@ -61,15 +61,12 @@
     {
         ColumnFamily cf1 = ColumnFamily.create("Keyspace1", "Standard1");
         cf1.addColumn(column("one", "A", 0));
-        Row row1 = new Row("", cf1);
 
         ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1");
         cf2.addColumn(column("one", "B", 1));
         cf2.addColumn(column("two", "C", 1));
-        Row row2 = new Row("", cf2);
 
-        row1.resolve(row2);
-        cf1 = row1.cf;
+        cf1.resolve(cf2);
         assert Arrays.equals(cf1.getColumn("one".getBytes()).value(), 
"B".getBytes());
         assert Arrays.equals(cf1.getColumn("two".getBytes()).value(), 
"C".getBytes());
     }

Modified: 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/ReadResponseResolverTest.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/ReadResponseResolverTest.java?rev=887467&r1=887466&r2=887467&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/ReadResponseResolverTest.java
 (original)
+++ 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/ReadResponseResolverTest.java
 Sat Dec  5 00:30:22 2009
@@ -17,16 +17,14 @@
     {
         ColumnFamily cf1 = ColumnFamily.create("Keyspace1", "Standard1");
         cf1.addColumn(column("c1", "v1", 0));
-        Row row1 = new Row("key1", cf1);
 
         ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1");
         cf2.addColumn(column("c1", "v2", 1));
-        Row row2 = new Row("key1", cf2);
 
-        Row resolved = 
ReadResponseResolver.resolveSuperset(Arrays.asList(row1, row2));
-        assertColumns(resolved.cf, "c1");
-        assertColumns(row1.diff(resolved), "c1");
-        assertNull(row2.diff(resolved));
+        ColumnFamily resolved = 
ReadResponseResolver.resolveSuperset(Arrays.asList(cf1, cf2));
+        assertColumns(resolved, "c1");
+        assertColumns(ColumnFamily.diff(cf1, resolved), "c1");
+        assertNull(ColumnFamily.diff(cf2, resolved));
     }
 
     @Test
@@ -34,31 +32,26 @@
     {
         ColumnFamily cf1 = ColumnFamily.create("Keyspace1", "Standard1");
         cf1.addColumn(column("c1", "v1", 0));
-        Row row1 = new Row("key1", cf1);
 
         ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1");
         cf2.addColumn(column("c2", "v2", 1));
-        Row row2 = new Row("key1", cf2);
 
-        Row resolved = 
ReadResponseResolver.resolveSuperset(Arrays.asList(row1, row2));
-        assertColumns(resolved.cf, "c1", "c2");
-        assertColumns(row1.diff(resolved), "c2");
-        assertColumns(row2.diff(resolved), "c1");
+        ColumnFamily resolved = 
ReadResponseResolver.resolveSuperset(Arrays.asList(cf1, cf2));
+        assertColumns(resolved, "c1", "c2");
+        assertColumns(ColumnFamily.diff(cf1, resolved), "c2");
+        assertColumns(ColumnFamily.diff(cf2, resolved), "c1");
     }
 
     @Test
     public void testResolveSupersetNullOne()
     {
-        Row row1 = new Row("key1", null);
-
         ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1");
         cf2.addColumn(column("c2", "v2", 1));
-        Row row2 = new Row("key1", cf2);
 
-        Row resolved = 
ReadResponseResolver.resolveSuperset(Arrays.asList(row1, row2));
-        assertColumns(resolved.cf, "c2");
-        assertColumns(row1.diff(resolved), "c2");
-        assertNull(row2.diff(resolved));
+        ColumnFamily resolved = 
ReadResponseResolver.resolveSuperset(Arrays.asList(null, cf2));
+        assertColumns(resolved, "c2");
+        assertColumns(ColumnFamily.diff(null, resolved), "c2");
+        assertNull(ColumnFamily.diff(cf2, resolved));
     }
 
     @Test
@@ -66,25 +59,16 @@
     {
         ColumnFamily cf1 = ColumnFamily.create("Keyspace1", "Standard1");
         cf1.addColumn(column("c1", "v1", 0));
-        Row row1 = new Row("key1", cf1);
-
-        Row row2 = new Row("key1", null);
 
-        Row resolved = 
ReadResponseResolver.resolveSuperset(Arrays.asList(row1, row2));
-        assertColumns(resolved.cf, "c1");
-        assertNull(row1.diff(resolved));
-        assertColumns(row2.diff(resolved), "c1");
+        ColumnFamily resolved = 
ReadResponseResolver.resolveSuperset(Arrays.asList(cf1, null));
+        assertColumns(resolved, "c1");
+        assertNull(ColumnFamily.diff(cf1, resolved));
+        assertColumns(ColumnFamily.diff(null, resolved), "c1");
     }
 
     @Test
     public void testResolveSupersetNullBoth()
     {
-        Row row1 = new Row("key1", null);
-        Row row2 = new Row("key1", null);
-
-        Row resolved = 
ReadResponseResolver.resolveSuperset(Arrays.asList(row1, row2));
-        assertNull(resolved.cf);
-        assertNull(row1.diff(resolved));
-        assertNull(row2.diff(resolved));
+        
assertNull(ReadResponseResolver.resolveSuperset(Arrays.<ColumnFamily>asList(null,
 null)));
     }
 }


Reply via email to