Since JIRA is mostly dead right now, here is the patch to test against 0.4.
On Mon, Sep 28, 2009 at 4:30 PM, Edmond Lau <[email protected]> wrote:
> On Fri, Sep 25, 2009 at 8:10 PM, Jonathan Ellis <[email protected]> wrote:
>> No, you're mixing two related concepts.
>>
>> When you do a quorum read it will fetch the actual data from one
>> replica and do digest reads from the others. If the data from the one
>> does not match the hash from the others, then you have the
>> digestmismatchexception Edmond is seeing and read repair is performed.
>> So this is not normal and probably a bug.
>
> Ok - I've filed a bug in jira: CASSANDRA-462.
>
>>
>> P.S. Edmund: with 2 replicas, quorum is the same as all. So you will
>> not be able to perform reads if any node is available. This is why
>> usually if you are going to do quorum reads you will have replication
>> factor 3 or more.
>
> Yep. I only have 3 machines available at the moment as I'm testing
> cassandra out.
>
>>
>> -Jonathan
>>
>> On Fri, Sep 25, 2009 at 8:31 PM, Sandeep Tata <[email protected]> wrote:
>>> This is a known issue, and we should perhaps open a JIRA on it.
>>> The original Dynamo approach was to have 3 mechanisms --
>>> HintedHandoff, read-repair, and Merk trees to guarantee convergence
>>> (eventual consistency). Cassandra only has the first two. There are
>>> some corner cases where hinted-handoff alone can't be relied on to
>>> guarantee convergence which is why there's read-repair on every read.
>>>
>>> Turning off read repair is a relatively simple (but risky) change to
>>> the code. However, minimizing unnecessary read repair is a lot
>>> trickier :)
>>>
>>>
>>> On Fri, Sep 25, 2009 at 5:39 PM, Edmond Lau <[email protected]> wrote:
>>>> I have a 3 node cluster with a replication factor of 2, running on 0.4
>>>> RC1. I've set both my read and write consistency levels to use a
>>>> quorum.
>>>>
>>>> I'm observing that quorum reads keep invoking read repair and log
>>>> DigestMismatchExceptions from the StorageProxy. Obviously, this
>>>> significantly reduces my read throughput. In the absence of any
>>>> additional inserts, I'd expect that read repair would happen at most
>>>> once before the 2 nodes responsible for the data both have fresh views
>>>> of the data.
>>>>
>>>> Here's what I see in my debug log for one machine on two consecutive
>>>> quorum reads for the data. I get similar messages when querying any
>>>> of the 3 nodes. Similar messages are logged on subsequent queries for
>>>> the exact same row/column. The issue happens when reading both
>>>> supercolumns or columns. Restarting the cluster has no effect.
>>>>
>>>> DEBUG [pool-1-thread-1] 2009-09-26 00:26:20,317 CassandraServer.java
>>>> (line 305) multiget
>>>> DEBUG [pool-1-thread-1] 2009-09-26 00:26:20,360 StorageProxy.java
>>>> (line 375) strongread reading data for
>>>> SliceByNamesReadCommand(table='Analytics', key='test',
>>>> columnParent='QueryPath(columnFamilyName='Domain',
>>>> superColumnName='null', co\
>>>> lumnName='null')', columns=[www.ooyala.com,]) from [email protected]:7000
>>>> DEBUG [pool-1-thread-1] 2009-09-26 00:26:20,365 StorageProxy.java
>>>> (line 382) strongread reading digest for
>>>> SliceByNamesReadCommand(table='Analytics', key='test',
>>>> columnParent='QueryPath(columnFamilyName='Domain',
>>>> superColumnName='null', \
>>>> columnName='null')', columns=[www.ooyala.com,]) from
>>>> [email protected]:7000
>>>> DEBUG [ROW-READ-STAGE:1] 2009-09-26 00:26:20,380 ReadVerbHandler.java
>>>> (line 100) Read key test; sending response to
>>>> [email protected]:7000
>>>> DEBUG [RESPONSE-STAGE:1] 2009-09-26 00:26:20,387
>>>> ResponseVerbHandler.java (line 34) Processing response on a callback
>>>> from [email protected]:7000
>>>> DEBUG [RESPONSE-STAGE:2] 2009-09-26 00:26:20,449
>>>> ResponseVerbHandler.java (line 34) Processing response on a callback
>>>> from [email protected]:7000
>>>> DEBUG [pool-1-thread-1] 2009-09-26 00:26:20,474
>>>> ReadResponseResolver.java (line 84) Response deserialization time : 0
>>>> ms.
>>>> DEBUG [pool-1-thread-1] 2009-09-26 00:26:20,474
>>>> ReadResponseResolver.java (line 84) Response deserialization time : 0
>>>> ms.
>>>> INFO [pool-1-thread-1] 2009-09-26 00:26:20,475 StorageProxy.java
>>>> (line 411) DigestMismatchException: test
>>>> DEBUG [ROW-READ-STAGE:2] 2009-09-26 00:26:20,477 ReadVerbHandler.java
>>>> (line 100) Read key test; sending response to [email protected]:7000
>>>> DEBUG [RESPONSE-STAGE:3] 2009-09-26 00:26:20,478
>>>> ResponseVerbHandler.java (line 34) Processing response on a callback
>>>> from [email protected]:7000
>>>> DEBUG [RESPONSE-STAGE:4] 2009-09-26 00:26:20,480
>>>> ResponseVerbHandler.java (line 34) Processing response on a callback
>>>> from [email protected]:7000
>>>> DEBUG [pool-1-thread-1] 2009-09-26 00:26:20,481
>>>> ReadResponseResolver.java (line 84) Response deserialization time : 0
>>>> ms.
>>>> DEBUG [pool-1-thread-1] 2009-09-26 00:26:20,481
>>>> ReadResponseResolver.java (line 84) Response deserialization time : 0
>>>> ms.
>>>> INFO [pool-1-thread-1] 2009-09-26 00:26:20,482
>>>> ReadResponseResolver.java (line 148) resolve: 1 ms.
>>>> DEBUG [pool-1-thread-2] 2009-09-26 00:27:22,099 CassandraServer.java
>>>> (line 305) multiget
>>>> DEBUG [pool-1-thread-2] 2009-09-26 00:27:22,100 StorageProxy.java
>>>> (line 375) strongread reading data for
>>>> SliceByNamesReadCommand(table='Analytics', key='test',
>>>> columnParent='QueryPath(columnFamilyName='Domain',
>>>> superColumnName='null', co\
>>>> lumnName='null')', columns=[www.ooyala.com,]) from [email protected]:7000
>>>> DEBUG [pool-1-thread-2] 2009-09-26 00:27:22,100 StorageProxy.java
>>>> (line 382) strongread reading digest for
>>>> SliceByNamesReadCommand(table='Analytics', key='test',
>>>> columnParent='QueryPath(columnFamilyName='Domain',
>>>> superColumnName='null', \
>>>> columnName='null')', columns=[www.ooyala.com,]) from
>>>> [email protected]:7000
>>>> DEBUG [ROW-READ-STAGE:1] 2009-09-26 00:27:22,103 ReadVerbHandler.java
>>>> (line 100) Read key test; sending response to
>>>> [email protected]:7000
>>>> DEBUG [RESPONSE-STAGE:1] 2009-09-26 00:27:22,103
>>>> ResponseVerbHandler.java (line 34) Processing response on a callback
>>>> from [email protected]:7000
>>>> DEBUG [RESPONSE-STAGE:2] 2009-09-26 00:27:22,107
>>>> ResponseVerbHandler.java (line 34) Processing response on a callback
>>>> from [email protected]:7000
>>>> DEBUG [pool-1-thread-2] 2009-09-26 00:27:22,108
>>>> ReadResponseResolver.java (line 84) Response deserialization time : 1
>>>> ms.
>>>> DEBUG [pool-1-thread-2] 2009-09-26 00:27:22,108
>>>> ReadResponseResolver.java (line 84) Response deserialization time : 0
>>>> ms.
>>>> INFO [pool-1-thread-2] 2009-09-26 00:27:22,109 StorageProxy.java
>>>> (line 411) DigestMismatchException: test
>>>> DEBUG [ROW-READ-STAGE:2] 2009-09-26 00:27:22,114 ReadVerbHandler.java
>>>> (line 100) Read key test; sending response to [email protected]:7000
>>>> DEBUG [RESPONSE-STAGE:3] 2009-09-26 00:27:22,114
>>>> ResponseVerbHandler.java (line 34) Processing response on a callback
>>>> from [email protected]:7000
>>>> DEBUG [RESPONSE-STAGE:4] 2009-09-26 00:27:22,205
>>>> ResponseVerbHandler.java (line 34) Processing response on a callback
>>>> from [email protected]:7000
>>>> DEBUG [pool-1-thread-2] 2009-09-26 00:27:22,206
>>>> ReadResponseResolver.java (line 84) Response deserialization time : 0
>>>> ms.
>>>> DEBUG [pool-1-thread-2] 2009-09-26 00:27:22,206
>>>> ReadResponseResolver.java (line 84) Response deserialization time : 0
>>>> ms.
>>>> INFO [pool-1-thread-2] 2009-09-26 00:27:22,207
>>>> ReadResponseResolver.java (line 148) resolve: 1 ms.
>>>>
>>>> Thoughts?
>>>>
>>>> Edmond
>>>>
>>>
>>
>
Index: test/unit/org/apache/cassandra/db/ColumnFamilyTest.java
===================================================================
--- test/unit/org/apache/cassandra/db/ColumnFamilyTest.java (revision 820115)
+++ test/unit/org/apache/cassandra/db/ColumnFamilyTest.java (working copy)
@@ -132,11 +132,4 @@
//addcolumns will only add if timestamp >= old timestamp
assert Arrays.equals(val, cf_result.getColumn("col2".getBytes()).value());
}
-
- @Test
- public void testEmptyDigest()
- {
- ColumnFamily cf = ColumnFamily.create("Keyspace1", "Standard1");
- assert cf.digest().length == 0;
- }
}
Index: src/java/org/apache/cassandra/db/ColumnFamily.java
===================================================================
--- src/java/org/apache/cassandra/db/ColumnFamily.java (revision 820115)
+++ src/java/org/apache/cassandra/db/ColumnFamily.java (working copy)
@@ -26,12 +26,11 @@
import java.util.SortedSet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ConcurrentSkipListMap;
+import java.security.MessageDigest;
-import org.apache.commons.lang.ArrayUtils;
import org.apache.log4j.Logger;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.io.ICompactSerializer2;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.marshal.AbstractType;
@@ -347,21 +346,12 @@
return sb.toString();
}
- public byte[] digest()
+ public void updateDigest(MessageDigest digest)
{
- byte[] xorHash = ArrayUtils.EMPTY_BYTE_ARRAY;
for (IColumn column : columns_.values())
{
- if (xorHash.length == 0)
- {
- xorHash = column.digest();
- }
- else
- {
- xorHash = FBUtilities.xor(xorHash, column.digest());
- }
+ column.updateDigest(digest);
}
- return xorHash;
}
public long getMarkedForDeleteAt()
Index: src/java/org/apache/cassandra/db/IColumn.java
===================================================================
--- src/java/org/apache/cassandra/db/IColumn.java (revision 820115)
+++ src/java/org/apache/cassandra/db/IColumn.java (working copy)
@@ -19,6 +19,7 @@
package org.apache.cassandra.db;
import java.util.Collection;
+import java.security.MessageDigest;
import org.apache.cassandra.db.marshal.AbstractType;
@@ -42,7 +43,7 @@
public void addColumn(IColumn column);
public IColumn diff(IColumn column);
public int getObjectCount();
- public byte[] digest();
+ public void updateDigest(MessageDigest digest);
public int getLocalDeletionTime(); // for tombstone GC, so int is sufficient granularity
public String getString(AbstractType comparator);
}
Index: src/java/org/apache/cassandra/db/Row.java
===================================================================
--- src/java/org/apache/cassandra/db/Row.java (revision 820115)
+++ src/java/org/apache/cassandra/db/Row.java (working copy)
@@ -26,6 +26,8 @@
import java.util.Map;
import java.util.Set;
import java.util.Arrays;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
@@ -170,20 +172,22 @@
public byte[] digest()
{
- Set<String> cfamilies = columnFamilies_.keySet();
- byte[] xorHash = ArrayUtils.EMPTY_BYTE_ARRAY;
- for (String cFamily : cfamilies)
+ MessageDigest digest;
+ try
{
- if (xorHash.length == 0)
- {
- xorHash = columnFamilies_.get(cFamily).digest();
- }
- else
- {
- xorHash = FBUtilities.xor(xorHash, columnFamilies_.get(cFamily).digest());
- }
+ digest = MessageDigest.getInstance("MD5");
}
- return xorHash;
+ catch (NoSuchAlgorithmException e)
+ {
+ throw new AssertionError(e);
+ }
+
+ for (String cFamily : columnFamilies_.keySet())
+ {
+ columnFamilies_.get(cFamily).updateDigest(digest);
+ }
+
+ return digest.digest();
}
void clear()
Index: src/java/org/apache/cassandra/db/SuperColumn.java
===================================================================
--- src/java/org/apache/cassandra/db/SuperColumn.java (revision 820115)
+++ src/java/org/apache/cassandra/db/SuperColumn.java (working copy)
@@ -23,15 +23,13 @@
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ConcurrentSkipListMap;
+import java.security.MessageDigest;
-import org.apache.commons.lang.ArrayUtils;
import org.apache.log4j.Logger;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.io.ICompactSerializer2;
+import org.apache.cassandra.io.DataOutputBuffer;
import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.MarshalException;
public final class SuperColumn implements IColumn, IColumnContainer
@@ -270,17 +268,24 @@
return null;
}
- public byte[] digest()
+ public void updateDigest(MessageDigest digest)
{
- byte[] xorHash = ArrayUtils.EMPTY_BYTE_ARRAY;
- if(name_ == null)
- return xorHash;
- xorHash = name_.clone();
- for(IColumn column : columns_.values())
- {
- xorHash = FBUtilities.xor(xorHash, column.digest());
- }
- return xorHash;
+ assert name_ != null;
+ digest.update(name_);
+ DataOutputBuffer buffer = new DataOutputBuffer();
+ try
+ {
+ buffer.writeLong(markedForDeleteAt);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ digest.update(buffer.getData(), 0, buffer.getLength());
+ for (IColumn column : columns_.values())
+ {
+ column.updateDigest(digest);
+ }
}
public String getString(AbstractType comparator)
Index: src/java/org/apache/cassandra/db/Column.java
===================================================================
--- src/java/org/apache/cassandra/db/Column.java (revision 820115)
+++ src/java/org/apache/cassandra/db/Column.java (working copy)
@@ -20,10 +20,13 @@
import java.util.Collection;
import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.io.IOException;
import org.apache.commons.lang.ArrayUtils;
import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.DataOutputBuffer;
/**
@@ -172,13 +175,21 @@
return null;
}
- public byte[] digest()
+ public void updateDigest(MessageDigest digest)
{
- StringBuilder stringBuilder = new StringBuilder();
- stringBuilder.append(name);
- stringBuilder.append(":");
- stringBuilder.append(timestamp);
- return stringBuilder.toString().getBytes();
+ digest.update(name);
+ digest.update(value);
+ DataOutputBuffer buffer = new DataOutputBuffer();
+ try
+ {
+ buffer.writeLong(timestamp);
+ buffer.writeBoolean(isMarkedForDelete);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ digest.update(buffer.getData(), 0, buffer.getLength());
}
public int getLocalDeletionTime()