Since JIRA is mostly dead right now, here is the patch to test against 0.4.

On Mon, Sep 28, 2009 at 4:30 PM, Edmond Lau <[email protected]> wrote:
> On Fri, Sep 25, 2009 at 8:10 PM, Jonathan Ellis <[email protected]> wrote:
>> No, you're mixing two related concepts.
>>
>> When you do a quorum read it will fetch the actual data from one
>> replica and do digest reads from the others.  If the data from the one
>> does not match the hash from the others, then you have the
>> digestmismatchexception Edmond is seeing and read repair is performed.
>>  So this is not normal and probably a bug.
>
> Ok - I've filed a bug in jira: CASSANDRA-462.
>
>>
>> P.S. Edmund: with 2 replicas, quorum is the same as all.  So you will
>> not be able to perform reads if any node is available.  This is why
>> usually if you are going to do quorum reads you will have replication
>> factor 3 or more.
>
> Yep.  I only have 3 machines available at the moment as I'm testing
> cassandra out.
>
>>
>> -Jonathan
>>
>> On Fri, Sep 25, 2009 at 8:31 PM, Sandeep Tata <[email protected]> wrote:
>>> This is a known issue, and we should perhaps open a JIRA on it.
>>> The original Dynamo approach was to have 3 mechanisms --
>>> HintedHandoff, read-repair, and Merk trees to guarantee convergence
>>> (eventual consistency). Cassandra only has the first two. There are
>>> some corner cases where hinted-handoff alone can't be relied on to
>>> guarantee convergence which is why there's read-repair  on every read.
>>>
>>> Turning off read repair is a relatively simple (but risky) change to
>>> the code. However, minimizing unnecessary read repair is a lot
>>> trickier :)
>>>
>>>
>>> On Fri, Sep 25, 2009 at 5:39 PM, Edmond Lau <[email protected]> wrote:
>>>> I have a 3 node cluster with a replication factor of 2, running on 0.4
>>>> RC1.  I've set both my read and write consistency levels to use a
>>>> quorum.
>>>>
>>>> I'm observing that quorum reads keep invoking read repair and log
>>>> DigestMismatchExceptions from the StorageProxy.  Obviously, this
>>>> significantly reduces my read throughput.  In the absence of any
>>>> additional inserts, I'd expect that read repair would happen at most
>>>> once before the 2 nodes responsible for the data both have fresh views
>>>> of the data.
>>>>
>>>> Here's what I see in my debug log for one machine on two consecutive
>>>> quorum reads for the data.  I get similar messages when querying any
>>>> of the 3 nodes.  Similar messages are logged on subsequent queries for
>>>> the exact same row/column.  The issue happens when reading both
>>>> supercolumns or columns.  Restarting the cluster has no effect.
>>>>
>>>> DEBUG [pool-1-thread-1] 2009-09-26 00:26:20,317 CassandraServer.java
>>>> (line 305) multiget
>>>> DEBUG [pool-1-thread-1] 2009-09-26 00:26:20,360 StorageProxy.java
>>>> (line 375) strongread reading data for
>>>> SliceByNamesReadCommand(table='Analytics', key='test',
>>>> columnParent='QueryPath(columnFamilyName='Domain',
>>>> superColumnName='null', co\
>>>> lumnName='null')', columns=[www.ooyala.com,]) from [email protected]:7000
>>>> DEBUG [pool-1-thread-1] 2009-09-26 00:26:20,365 StorageProxy.java
>>>> (line 382) strongread reading digest for
>>>> SliceByNamesReadCommand(table='Analytics', key='test',
>>>> columnParent='QueryPath(columnFamilyName='Domain',
>>>> superColumnName='null', \
>>>> columnName='null')', columns=[www.ooyala.com,]) from 
>>>> [email protected]:7000
>>>> DEBUG [ROW-READ-STAGE:1] 2009-09-26 00:26:20,380 ReadVerbHandler.java
>>>> (line 100) Read key test; sending response to
>>>> [email protected]:7000
>>>> DEBUG [RESPONSE-STAGE:1] 2009-09-26 00:26:20,387
>>>> ResponseVerbHandler.java (line 34) Processing response on a callback
>>>> from [email protected]:7000
>>>> DEBUG [RESPONSE-STAGE:2] 2009-09-26 00:26:20,449
>>>> ResponseVerbHandler.java (line 34) Processing response on a callback
>>>> from [email protected]:7000
>>>> DEBUG [pool-1-thread-1] 2009-09-26 00:26:20,474
>>>> ReadResponseResolver.java (line 84) Response deserialization time : 0
>>>> ms.
>>>> DEBUG [pool-1-thread-1] 2009-09-26 00:26:20,474
>>>> ReadResponseResolver.java (line 84) Response deserialization time : 0
>>>> ms.
>>>>  INFO [pool-1-thread-1] 2009-09-26 00:26:20,475 StorageProxy.java
>>>> (line 411) DigestMismatchException: test
>>>> DEBUG [ROW-READ-STAGE:2] 2009-09-26 00:26:20,477 ReadVerbHandler.java
>>>> (line 100) Read key test; sending response to [email protected]:7000
>>>> DEBUG [RESPONSE-STAGE:3] 2009-09-26 00:26:20,478
>>>> ResponseVerbHandler.java (line 34) Processing response on a callback
>>>> from [email protected]:7000
>>>> DEBUG [RESPONSE-STAGE:4] 2009-09-26 00:26:20,480
>>>> ResponseVerbHandler.java (line 34) Processing response on a callback
>>>> from [email protected]:7000
>>>> DEBUG [pool-1-thread-1] 2009-09-26 00:26:20,481
>>>> ReadResponseResolver.java (line 84) Response deserialization time : 0
>>>> ms.
>>>> DEBUG [pool-1-thread-1] 2009-09-26 00:26:20,481
>>>> ReadResponseResolver.java (line 84) Response deserialization time : 0
>>>> ms.
>>>>  INFO [pool-1-thread-1] 2009-09-26 00:26:20,482
>>>> ReadResponseResolver.java (line 148) resolve: 1 ms.
>>>> DEBUG [pool-1-thread-2] 2009-09-26 00:27:22,099 CassandraServer.java
>>>> (line 305) multiget
>>>> DEBUG [pool-1-thread-2] 2009-09-26 00:27:22,100 StorageProxy.java
>>>> (line 375) strongread reading data for
>>>> SliceByNamesReadCommand(table='Analytics', key='test',
>>>> columnParent='QueryPath(columnFamilyName='Domain',
>>>> superColumnName='null', co\
>>>> lumnName='null')', columns=[www.ooyala.com,]) from [email protected]:7000
>>>> DEBUG [pool-1-thread-2] 2009-09-26 00:27:22,100 StorageProxy.java
>>>> (line 382) strongread reading digest for
>>>> SliceByNamesReadCommand(table='Analytics', key='test',
>>>> columnParent='QueryPath(columnFamilyName='Domain',
>>>> superColumnName='null', \
>>>> columnName='null')', columns=[www.ooyala.com,]) from 
>>>> [email protected]:7000
>>>> DEBUG [ROW-READ-STAGE:1] 2009-09-26 00:27:22,103 ReadVerbHandler.java
>>>> (line 100) Read key test; sending response to
>>>> [email protected]:7000
>>>> DEBUG [RESPONSE-STAGE:1] 2009-09-26 00:27:22,103
>>>> ResponseVerbHandler.java (line 34) Processing response on a callback
>>>> from [email protected]:7000
>>>> DEBUG [RESPONSE-STAGE:2] 2009-09-26 00:27:22,107
>>>> ResponseVerbHandler.java (line 34) Processing response on a callback
>>>> from [email protected]:7000
>>>> DEBUG [pool-1-thread-2] 2009-09-26 00:27:22,108
>>>> ReadResponseResolver.java (line 84) Response deserialization time : 1
>>>> ms.
>>>> DEBUG [pool-1-thread-2] 2009-09-26 00:27:22,108
>>>> ReadResponseResolver.java (line 84) Response deserialization time : 0
>>>> ms.
>>>>  INFO [pool-1-thread-2] 2009-09-26 00:27:22,109 StorageProxy.java
>>>> (line 411) DigestMismatchException: test
>>>> DEBUG [ROW-READ-STAGE:2] 2009-09-26 00:27:22,114 ReadVerbHandler.java
>>>> (line 100) Read key test; sending response to [email protected]:7000
>>>> DEBUG [RESPONSE-STAGE:3] 2009-09-26 00:27:22,114
>>>> ResponseVerbHandler.java (line 34) Processing response on a callback
>>>> from [email protected]:7000
>>>> DEBUG [RESPONSE-STAGE:4] 2009-09-26 00:27:22,205
>>>> ResponseVerbHandler.java (line 34) Processing response on a callback
>>>> from [email protected]:7000
>>>> DEBUG [pool-1-thread-2] 2009-09-26 00:27:22,206
>>>> ReadResponseResolver.java (line 84) Response deserialization time : 0
>>>> ms.
>>>> DEBUG [pool-1-thread-2] 2009-09-26 00:27:22,206
>>>> ReadResponseResolver.java (line 84) Response deserialization time : 0
>>>> ms.
>>>>  INFO [pool-1-thread-2] 2009-09-26 00:27:22,207
>>>> ReadResponseResolver.java (line 148) resolve: 1 ms.
>>>>
>>>> Thoughts?
>>>>
>>>> Edmond
>>>>
>>>
>>
>
Index: test/unit/org/apache/cassandra/db/ColumnFamilyTest.java
===================================================================
--- test/unit/org/apache/cassandra/db/ColumnFamilyTest.java	(revision 820115)
+++ test/unit/org/apache/cassandra/db/ColumnFamilyTest.java	(working copy)
@@ -132,11 +132,4 @@
         //addcolumns will only add if timestamp >= old timestamp
         assert Arrays.equals(val, cf_result.getColumn("col2".getBytes()).value());
     }
-
-    @Test
-    public void testEmptyDigest()
-    {
-        ColumnFamily cf = ColumnFamily.create("Keyspace1", "Standard1");
-        assert cf.digest().length == 0;
-    }
 }
Index: src/java/org/apache/cassandra/db/ColumnFamily.java
===================================================================
--- src/java/org/apache/cassandra/db/ColumnFamily.java	(revision 820115)
+++ src/java/org/apache/cassandra/db/ColumnFamily.java	(working copy)
@@ -26,12 +26,11 @@
 import java.util.SortedSet;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.security.MessageDigest;
 
-import org.apache.commons.lang.ArrayUtils;
 import org.apache.log4j.Logger;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.io.ICompactSerializer2;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.marshal.AbstractType;
@@ -347,21 +346,12 @@
     	return sb.toString();
     }
 
-    public byte[] digest()
+    public void updateDigest(MessageDigest digest)
     {
-        byte[] xorHash = ArrayUtils.EMPTY_BYTE_ARRAY;
         for (IColumn column : columns_.values())
         {
-            if (xorHash.length == 0)
-            {
-                xorHash = column.digest();
-            }
-            else
-            {
-                xorHash = FBUtilities.xor(xorHash, column.digest());
-            }
+            column.updateDigest(digest);
         }
-        return xorHash;
     }
 
     public long getMarkedForDeleteAt()
Index: src/java/org/apache/cassandra/db/IColumn.java
===================================================================
--- src/java/org/apache/cassandra/db/IColumn.java	(revision 820115)
+++ src/java/org/apache/cassandra/db/IColumn.java	(working copy)
@@ -19,6 +19,7 @@
 package org.apache.cassandra.db;
 
 import java.util.Collection;
+import java.security.MessageDigest;
 
 import org.apache.cassandra.db.marshal.AbstractType;
 
@@ -42,7 +43,7 @@
     public void addColumn(IColumn column);
     public IColumn diff(IColumn column);
     public int getObjectCount();
-    public byte[] digest();
+    public void updateDigest(MessageDigest digest);
     public int getLocalDeletionTime(); // for tombstone GC, so int is sufficient granularity
     public String getString(AbstractType comparator);
 }
Index: src/java/org/apache/cassandra/db/Row.java
===================================================================
--- src/java/org/apache/cassandra/db/Row.java	(revision 820115)
+++ src/java/org/apache/cassandra/db/Row.java	(working copy)
@@ -26,6 +26,8 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.Arrays;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
 
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang.StringUtils;
@@ -170,20 +172,22 @@
 
     public byte[] digest()
     {
-        Set<String> cfamilies = columnFamilies_.keySet();
-        byte[] xorHash = ArrayUtils.EMPTY_BYTE_ARRAY;
-        for (String cFamily : cfamilies)
+        MessageDigest digest;
+        try
         {
-            if (xorHash.length == 0)
-            {
-                xorHash = columnFamilies_.get(cFamily).digest();
-            }
-            else
-            {
-                xorHash = FBUtilities.xor(xorHash, columnFamilies_.get(cFamily).digest());
-            }
+            digest = MessageDigest.getInstance("MD5");
         }
-        return xorHash;
+        catch (NoSuchAlgorithmException e)
+        {
+            throw new AssertionError(e);
+        }
+
+        for (String cFamily : columnFamilies_.keySet())
+        {
+            columnFamilies_.get(cFamily).updateDigest(digest);
+        }
+
+        return digest.digest();
     }
 
     void clear()
Index: src/java/org/apache/cassandra/db/SuperColumn.java
===================================================================
--- src/java/org/apache/cassandra/db/SuperColumn.java	(revision 820115)
+++ src/java/org/apache/cassandra/db/SuperColumn.java	(working copy)
@@ -23,15 +23,13 @@
 import java.util.Arrays;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.security.MessageDigest;
 
-import org.apache.commons.lang.ArrayUtils;
 import org.apache.log4j.Logger;
 
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.io.ICompactSerializer2;
+import org.apache.cassandra.io.DataOutputBuffer;
 import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.MarshalException;
 
 
 public final class SuperColumn implements IColumn, IColumnContainer
@@ -270,17 +268,24 @@
         	return null;
     }
 
-    public byte[] digest()
+    public void updateDigest(MessageDigest digest)
     {
-    	byte[] xorHash = ArrayUtils.EMPTY_BYTE_ARRAY;
-    	if(name_ == null)
-    		return xorHash;
-    	xorHash = name_.clone();
-    	for(IColumn column : columns_.values())
-    	{
-			xorHash = FBUtilities.xor(xorHash, column.digest());
-    	}
-    	return xorHash;
+        assert name_ != null;
+        digest.update(name_);
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        try
+        {
+            buffer.writeLong(markedForDeleteAt);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+        digest.update(buffer.getData(), 0, buffer.getLength());
+        for (IColumn column : columns_.values())
+        {
+            column.updateDigest(digest);
+        }
     }
 
     public String getString(AbstractType comparator)
Index: src/java/org/apache/cassandra/db/Column.java
===================================================================
--- src/java/org/apache/cassandra/db/Column.java	(revision 820115)
+++ src/java/org/apache/cassandra/db/Column.java	(working copy)
@@ -20,10 +20,13 @@
 
 import java.util.Collection;
 import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.io.IOException;
 
 import org.apache.commons.lang.ArrayUtils;
 
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.DataOutputBuffer;
 
 
 /**
@@ -172,13 +175,21 @@
         return null;
     }
 
-    public byte[] digest()
+    public void updateDigest(MessageDigest digest)
     {
-        StringBuilder stringBuilder = new StringBuilder();
-        stringBuilder.append(name);
-        stringBuilder.append(":");
-        stringBuilder.append(timestamp);
-        return stringBuilder.toString().getBytes();
+        digest.update(name);
+        digest.update(value);
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        try
+        {
+            buffer.writeLong(timestamp);
+            buffer.writeBoolean(isMarkedForDelete);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+        digest.update(buffer.getData(), 0, buffer.getLength());
     }
 
     public int getLocalDeletionTime()

Reply via email to