Author: jbellis
Date: Wed Mar 10 17:17:38 2010
New Revision: 921456
URL: http://svn.apache.org/viewvc?rev=921456&view=rev
Log:
fix thread-safety problems with IAsyncCallback. patch by jbellis; reviewed by
Roger Schildmeijer and gdusbabek for CASSANDRA-864
Modified:
incubator/cassandra/branches/cassandra-0.6/CHANGES.txt
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IAsyncCallback.java
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyManager.java
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/IResponseResolver.java
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/WriteResponseHandler.java
Modified: incubator/cassandra/branches/cassandra-0.6/CHANGES.txt
URL:
http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=921456&r1=921455&r2=921456&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ incubator/cassandra/branches/cassandra-0.6/CHANGES.txt Wed Mar 10 17:17:38
2010
@@ -18,6 +18,8 @@
* fix classpath in cassandra-cli.bat for Windows (CASSANDRA-858)
* allow re-specifying host, port to cassandra-cli if invalid ones
are first tried (CASSANDRA-867)
+ * fix race condition handling rpc timeout in the coordinator
+ (CASSANDRA-864)
0.6.0-beta1/beta2
Modified:
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IAsyncCallback.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IAsyncCallback.java?rev=921456&r1=921455&r2=921456&view=diff
==============================================================================
---
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IAsyncCallback.java
(original)
+++
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IAsyncCallback.java
Wed Mar 10 17:17:38 2010
@@ -18,11 +18,16 @@
package org.apache.cassandra.net;
+/**
+ * implementors of IAsyncCallback need to make sure that any public methods
+ * are threadsafe with respect to response() being called from the message
+ * service. In particular, if any shared state is referenced, making
+ * response alone synchronized will not suffice.
+ */
public interface IAsyncCallback
{
/**
* @param msg response received.
- * Calls to response() are serialized by ResponseVerbHandler.
*/
public void response(Message msg);
}
Modified:
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java?rev=921456&r1=921455&r2=921456&view=diff
==============================================================================
---
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
(original)
+++
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
Wed Mar 10 17:17:38 2010
@@ -32,10 +32,7 @@ public class ResponseVerbHandler impleme
{
if (logger_.isDebugEnabled())
logger_.debug("Processing response on a callback from " +
message.getMessageId() + "@" + message.getFrom());
- synchronized (cb)
- {
- cb.response(message);
- }
+ cb.response(message);
}
else
{
Modified:
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyManager.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyManager.java?rev=921456&r1=921455&r2=921456&view=diff
==============================================================================
---
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyManager.java
(original)
+++
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyManager.java
Wed Mar 10 17:17:38 2010
@@ -21,23 +21,26 @@ package org.apache.cassandra.service;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
+import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.log4j.Logger;
+import org.apache.commons.lang.StringUtils;
import org.apache.cassandra.cache.ICacheExpungeHook;
+import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.db.Row;
-import org.apache.cassandra.db.ColumnFamily;
-import java.net.InetAddress;
import org.apache.cassandra.net.IAsyncCallback;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.*;
-
-import org.apache.log4j.Logger;
-import org.apache.commons.lang.StringUtils;
+import org.apache.cassandra.utils.ExpiringMap;
+import org.apache.cassandra.utils.FBUtilities;
class ConsistencyManager implements Runnable
@@ -47,9 +50,10 @@ class ConsistencyManager implements Runn
class DigestResponseHandler implements IAsyncCallback
{
- List<Message> responses_ = new ArrayList<Message>();
+ Collection<Message> responses_ = new
LinkedBlockingQueue<Message>();
- public void response(Message msg)
+ // syncronized so "size() == " works
+ public synchronized void response(Message msg)
{
responses_.add(msg);
if (responses_.size() == ConsistencyManager.this.replicas_.size())
@@ -94,17 +98,18 @@ class ConsistencyManager implements Runn
static class DataRepairHandler implements IAsyncCallback,
ICacheExpungeHook<String, String>
{
- private List<Message> responses_ = new ArrayList<Message>();
- private IResponseResolver<Row> readResponseResolver_;
- private int majority_;
+ private final Collection<Message> responses_ = new
LinkedBlockingQueue<Message>();
+ private final IResponseResolver<Row> readResponseResolver_;
+ private final int majority_;
DataRepairHandler(int responseCount, IResponseResolver<Row>
readResponseResolver)
{
readResponseResolver_ = readResponseResolver;
majority_ = (responseCount / 2) + 1;
}
-
- public void response(Message message)
+
+ // synchronized so the " == majority" is safe
+ public synchronized void response(Message message)
{
if (logger_.isDebugEnabled())
logger_.debug("Received responses in
DataRepairHandler : " + message.toString());
@@ -120,7 +125,7 @@ class ConsistencyManager implements Runn
{
try
{
- readResponseResolver_.resolve(new
ArrayList<Message>(responses_));
+ readResponseResolver_.resolve(responses_);
}
catch (Exception ex)
{
Modified:
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java?rev=921456&r1=921455&r2=921456&view=diff
==============================================================================
---
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
(original)
+++
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
Wed Mar 10 17:17:38 2010
@@ -53,12 +53,11 @@ public class DatacenterSyncWriteResponse
}
@Override
- public void response(Message message)
+ // synchronized for the benefit of dcResponses and responseCounts.
"responses" itself
+ // is inherited from WRH and is concurrent.
+ // TODO can we use concurrent structures instead?
+ public synchronized void response(Message message)
{
- if (condition.isSignaled())
- {
- return;
- }
try
{
String dataCenter = endPointSnitch.getLocation(message.getFrom());
@@ -89,8 +88,7 @@ public class DatacenterSyncWriteResponse
throw new RuntimeException(e);
}
responses.add(message);
- // If done then the response count will be empty after removing
- // everything.
+ // If done then the response count will be empty
if (responseCounts.isEmpty())
{
condition.signal();
Modified:
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java?rev=921456&r1=921455&r2=921456&view=diff
==============================================================================
---
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
(original)
+++
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
Wed Mar 10 17:17:38 2010
@@ -26,6 +26,7 @@ package org.apache.cassandra.service;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.locator.IEndPointSnitch;
@@ -40,15 +41,15 @@ import org.apache.cassandra.utils.FBUtil
*/
public class DatacenterWriteResponseHandler extends WriteResponseHandler
{
- private int blockFor;
- private DatacenterEndPointSnitch endpointsnitch;
- private InetAddress localEndpoint;
+ private final AtomicInteger blockFor;
+ private final DatacenterEndPointSnitch endpointsnitch;
+ private final InetAddress localEndpoint;
public DatacenterWriteResponseHandler(int blockFor, String table)
{
// Response is been managed by the map so the waitlist size really
doesnt matter.
super(blockFor, table);
- this.blockFor = blockFor;
+ this.blockFor = new AtomicInteger(blockFor);
endpointsnitch = (DatacenterEndPointSnitch)
DatabaseDescriptor.getEndPointSnitch(table);
localEndpoint = FBUtilities.getLocalAddress();
}
@@ -56,17 +57,13 @@ public class DatacenterWriteResponseHand
@Override
public void response(Message message)
{
- // IF done look no futher.
- if (condition.isSignaled())
- {
- return;
- }
- //Is optimal to check if same datacenter than comparing Arrays.
+ //Is optimal to check if same datacenter than comparing Arrays.
+ int b = -1;
try
{
if (endpointsnitch.isInSameDataCenter(localEndpoint,
message.getFrom()))
{
- blockFor--;
+ b = blockFor.decrementAndGet();
}
}
catch (UnknownHostException e)
@@ -74,7 +71,7 @@ public class DatacenterWriteResponseHand
throw new RuntimeException(e);
}
responses.add(message);
- if (blockFor <= 0)
+ if (b == 0)
{
//Singnal when Quorum is recived.
condition.signal();
Modified:
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/IResponseResolver.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/IResponseResolver.java?rev=921456&r1=921455&r2=921456&view=diff
==============================================================================
---
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/IResponseResolver.java
(original)
+++
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/IResponseResolver.java
Wed Mar 10 17:17:38 2010
@@ -18,6 +18,7 @@
package org.apache.cassandra.service;
+import java.util.Collection;
import java.util.List;
import java.io.IOException;
@@ -33,7 +34,7 @@ public interface IResponseResolver<T> {
* repairs . Hence you need to derive a response resolver based on your
* needs from this interface.
*/
- public T resolve(List<Message> responses) throws
DigestMismatchException, IOException;
- public boolean isDataPresent(List<Message> responses);
+ public T resolve(Collection<Message> responses) throws
DigestMismatchException, IOException;
+ public boolean isDataPresent(Collection<Message> responses);
}
Modified:
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/QuorumResponseHandler.java?rev=921456&r1=921455&r2=921456&view=diff
==============================================================================
---
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
(original)
+++
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
Wed Mar 10 17:17:38 2010
@@ -18,8 +18,11 @@
package org.apache.cassandra.service;
+import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
import java.util.ArrayList;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.io.IOException;
@@ -36,14 +39,14 @@ public class QuorumResponseHandler<T> im
{
protected static final Logger logger = Logger.getLogger(
QuorumResponseHandler.class );
protected final SimpleCondition condition = new SimpleCondition();
- protected final List<Message> responses;
+ protected final Collection<Message> responses;
private IResponseResolver<T> responseResolver;
private final long startTime;
public QuorumResponseHandler(int responseCount, IResponseResolver<T>
responseResolver)
{
- responses = new ArrayList<Message>(responseCount);
- this.responseResolver = responseResolver;
+ responses = new LinkedBlockingQueue<Message>();
+ this.responseResolver = responseResolver;
startTime = System.currentTimeMillis();
}
@@ -85,9 +88,6 @@ public class QuorumResponseHandler<T> im
public void response(Message message)
{
- if (condition.isSignaled())
- return;
-
responses.add(message);
if (responseResolver.isDataPresent(responses))
{
Modified:
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java?rev=921456&r1=921455&r2=921456&view=diff
==============================================================================
---
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
(original)
+++
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
Wed Mar 10 17:17:38 2010
@@ -51,7 +51,7 @@ public class RangeSliceResponseResolver
this.table = table;
}
- public List<Row> resolve(List<Message> responses) throws
DigestMismatchException, IOException
+ public List<Row> resolve(Collection<Message> responses) throws
DigestMismatchException, IOException
{
CollatingIterator collator = new CollatingIterator(new
Comparator<Pair<Row,InetAddress>>()
{
@@ -99,7 +99,7 @@ public class RangeSliceResponseResolver
return resolvedRows;
}
- public boolean isDataPresent(List<Message> responses)
+ public boolean isDataPresent(Collection<Message> responses)
{
return responses.size() >= sources.size();
}
Modified:
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=921456&r1=921455&r2=921456&view=diff
==============================================================================
---
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java
(original)
+++
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java
Wed Mar 10 17:17:38 2010
@@ -23,6 +23,7 @@ import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
import org.apache.cassandra.db.ColumnFamily;
@@ -63,7 +64,7 @@ public class ReadResponseResolver implem
* repair request should be scheduled.
*
*/
- public Row resolve(List<Message> responses) throws
DigestMismatchException, IOException
+ public Row resolve(Collection<Message> responses) throws
DigestMismatchException, IOException
{
long startTime = System.currentTimeMillis();
List<ColumnFamily> versions = new ArrayList<ColumnFamily>();
@@ -159,7 +160,7 @@ public class ReadResponseResolver implem
return resolved;
}
- public boolean isDataPresent(List<Message> responses)
+ public boolean isDataPresent(Collection<Message> responses)
{
if (responses.size() < responseCount)
return false;
Modified:
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/WriteResponseHandler.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/WriteResponseHandler.java?rev=921456&r1=921455&r2=921456&view=diff
==============================================================================
---
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/WriteResponseHandler.java
(original)
+++
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/WriteResponseHandler.java
Wed Mar 10 17:17:38 2010
@@ -18,11 +18,15 @@
package org.apache.cassandra.service;
+import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
import java.util.ArrayList;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.net.IAsyncCallback;
@@ -37,8 +41,8 @@ public class WriteResponseHandler implem
protected static final Logger logger = Logger.getLogger(
WriteResponseHandler.class );
protected final SimpleCondition condition = new SimpleCondition();
private final int responseCount;
- protected final List<Message> responses;
- protected int localResponses;
+ protected final Collection<Message> responses;
+ protected AtomicInteger localResponses = new AtomicInteger(0);
private final long startTime;
public WriteResponseHandler(int responseCount, String table)
@@ -49,7 +53,7 @@ public class WriteResponseHandler implem
: "invalid response count " + responseCount;
this.responseCount = responseCount;
- responses = new ArrayList<Message>(responseCount);
+ responses = new LinkedBlockingQueue<Message>();
startTime = System.currentTimeMillis();
}
@@ -82,25 +86,21 @@ public class WriteResponseHandler implem
}
}
- public synchronized void response(Message message)
+ public void response(Message message)
{
- if (condition.isSignaled())
- return;
responses.add(message);
maybeSignal();
}
- public synchronized void localResponse()
+ public void localResponse()
{
- if (condition.isSignaled())
- return;
- localResponses++;
+ localResponses.addAndGet(1);
maybeSignal();
}
private void maybeSignal()
{
- if (responses.size() + localResponses >= responseCount)
+ if (responses.size() + localResponses.get() >= responseCount)
{
condition.signal();
}