Author: jbellis
Date: Tue Nov 17 13:34:06 2009
New Revision: 881281
URL: http://svn.apache.org/viewvc?rev=881281&view=rev
Log:
add WriteResponseHandler combining the important parts of QuorumResponseHandler
and WriteResponseResolver.
In particular, not thate we (correctly) never send a write response of false,
letting the timeout take care
of that should-never-happen case. optimize local writes in insertBlocking, and
fix HH.
patch by jbellis; reviewed by Jaakko Laine for CASSANDRA-558
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java
Removed:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseResolver.java
Modified:
incubator/cassandra/trunk/CHANGES.txt
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumSyncResponseHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Modified: incubator/cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/CHANGES.txt?rev=881281&r1=881280&r2=881281&view=diff
==============================================================================
--- incubator/cassandra/trunk/CHANGES.txt (original)
+++ incubator/cassandra/trunk/CHANGES.txt Tue Nov 17 13:34:06 2009
@@ -41,7 +41,8 @@
interfaces (CASSANDRA-546)
* stress.py benchmarking tool improvements (several tickets)
* optimized replica placement code (CASSANDRA-525)
- * faster log replay on restart (CASSANDRA-539, -540)
+ * faster log replay on restart (CASSANDRA-539, CASSANDRA-540)
+ * optimized local-node writes (CASSANDRA-558)
0.4.2
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=881281&r1=881280&r2=881281&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
Tue Nov 17 13:34:06 2009
@@ -101,7 +101,7 @@
return instance_;
}
- private static boolean sendMessage(InetAddress endPoint, String tableName,
String key) throws DigestMismatchException, TimeoutException, IOException,
InvalidRequestException
+ private static boolean sendMessage(InetAddress endPoint, String tableName,
String key) throws IOException
{
if (!FailureDetector.instance().isAlive(endPoint))
{
@@ -112,10 +112,18 @@
Row row = table.get(key);
RowMutation rm = new RowMutation(tableName, row);
Message message = rm.makeRowMutationMessage();
- QuorumResponseHandler<Boolean> quorumResponseHandler = new
QuorumResponseHandler<Boolean>(1, new WriteResponseResolver());
- MessagingService.instance().sendRR(message, new InetAddress[] {
endPoint }, quorumResponseHandler);
+ WriteResponseHandler responseHandler = new WriteResponseHandler(1);
+ MessagingService.instance().sendRR(message, new InetAddress[] {
endPoint }, responseHandler);
- return quorumResponseHandler.get();
+ try
+ {
+ responseHandler.get();
+ }
+ catch (TimeoutException e)
+ {
+ return false;
+ }
+ return true;
}
private static void deleteEndPoint(byte[] endpointAddress, String
tableName, byte[] key, long timestamp) throws IOException
@@ -205,7 +213,7 @@
Collection<IColumn> endpoints = keyColumn.getSubColumns();
for (IColumn hintEndPoint : endpoints)
{
- if (Arrays.equals(hintEndPoint.name(), targetEPBytes) &&
sendMessage(endPoint, null, keyStr))
+ if (Arrays.equals(hintEndPoint.name(), targetEPBytes) &&
sendMessage(endPoint, tableName, keyStr))
{
if (endpoints.size() == 1)
{
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=881281&r1=881280&r2=881281&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
Tue Nov 17 13:34:06 2009
@@ -31,6 +31,7 @@
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.service.IResponseResolver;
import org.apache.cassandra.service.QuorumResponseHandler;
+import org.apache.cassandra.service.WriteResponseHandler;
import org.apache.cassandra.utils.FBUtilities;
/**
@@ -55,9 +56,9 @@
public abstract ArrayList<InetAddress> getNaturalEndpoints(Token token,
TokenMetadata metadata);
- public <T> QuorumResponseHandler<T>
getResponseHandler(IResponseResolver<T> responseResolver, int blockFor, int
consistency_level)
+ public WriteResponseHandler getWriteResponseHandler(int blockFor, int
consistency_level)
{
- return new QuorumResponseHandler<T>(blockFor, responseResolver);
+ return new WriteResponseHandler(blockFor);
}
public ArrayList<InetAddress> getNaturalEndpoints(Token token)
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java?rev=881281&r1=881280&r2=881281&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java
Tue Nov 17 13:34:06 2009
@@ -202,17 +202,16 @@
* return a DCQRH with a map of all the DC rep facor.
*/
@Override
- public <T> QuorumResponseHandler<T>
getResponseHandler(IResponseResolver<T> responseResolver, int blockFor, int
consistency_level)
+ public WriteResponseHandler getWriteResponseHandler(int blockFor, int
consistency_level)
{
if (consistency_level == ConsistencyLevel.DCQUORUM)
{
- List<InetAddress> endpoints = getLocalEndPoints();
- return new DatacenterQuorumResponseHandler<T>(locQFactor,
responseResolver);
+ return new DatacenterQuorumResponseHandler(locQFactor);
}
else if (consistency_level == ConsistencyLevel.DCQUORUMSYNC)
{
- return new
DatacenterQuorumSyncResponseHandler<T>(getQuorumRepFactor(), responseResolver);
+ return new
DatacenterQuorumSyncResponseHandler(getQuorumRepFactor());
}
- return super.getResponseHandler(responseResolver, blockFor,
consistency_level);
+ return super.getWriteResponseHandler(blockFor, consistency_level);
}
}
\ No newline at end of file
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=881281&r1=881280&r2=881281&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
Tue Nov 17 13:34:06 2009
@@ -322,15 +322,20 @@
*/
public String sendRR(Message message, InetAddress[] to, IAsyncCallback cb)
{
- String messageId = message.getMessageId();
- callbackMap_.put(messageId, cb);
+ String messageId = message.getMessageId();
+ addCallback(cb, messageId);
for ( int i = 0; i < to.length; ++i )
{
sendOneWay(message, to[i]);
}
return messageId;
}
-
+
+ public void addCallback(IAsyncCallback cb, String messageId)
+ {
+ callbackMap_.put(messageId, cb);
+ }
+
/**
* Send a message to a given endpoint. This method specifies a callback
* which is invoked with the actual response.
@@ -344,7 +349,7 @@
public String sendRR(Message message, InetAddress to, IAsyncCallback cb)
{
String messageId = message.getMessageId();
- callbackMap_.put(messageId, cb);
+ addCallback(cb, messageId);
sendOneWay(message, to);
return messageId;
}
@@ -369,7 +374,7 @@
throw new IllegalArgumentException("Number of messages and the
number of endpoints need to be same.");
}
String groupId = GuidGenerator.guid();
- callbackMap_.put(groupId, cb);
+ addCallback(cb, groupId);
for ( int i = 0; i < messages.length; ++i )
{
messages[i].setMessageId(groupId);
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java?rev=881281&r1=881280&r2=881281&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
Tue Nov 17 13:34:06 2009
@@ -16,16 +16,16 @@
* provided in the input map. it will block till we recive response from (DC,
n)
* nodes.
*/
-public class DatacenterQuorumResponseHandler<T> extends
QuorumResponseHandler<T>
+public class DatacenterQuorumResponseHandler extends WriteResponseHandler
{
private int blockFor;
private IEndPointSnitch endpointsnitch;
private InetAddress localEndpoint;
- public DatacenterQuorumResponseHandler(int blockFor, IResponseResolver<T>
responseResolver)
+ public DatacenterQuorumResponseHandler(int blockFor)
{
// Response is been managed by the map so the waitlist size really
doesnt matter.
- super(blockFor, responseResolver);
+ super(blockFor);
this.blockFor = blockFor;
endpointsnitch = DatabaseDescriptor.getEndPointSnitch();
localEndpoint = FBUtilities.getLocalAddress();
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumSyncResponseHandler.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumSyncResponseHandler.java?rev=881281&r1=881280&r2=881281&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumSyncResponseHandler.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumSyncResponseHandler.java
Tue Nov 17 13:34:06 2009
@@ -15,15 +15,15 @@
* provided in the input map. it will block till we recive response from
* n nodes in each of our data centers.
*/
-public class DatacenterQuorumSyncResponseHandler<T> extends
QuorumResponseHandler<T>
+public class DatacenterQuorumSyncResponseHandler extends WriteResponseHandler
{
private final Map<String, Integer> dcResponses = new HashMap<String,
Integer>();
private final Map<String, Integer> responseCounts;
- public DatacenterQuorumSyncResponseHandler(Map<String, Integer>
responseCounts, IResponseResolver<T> responseResolver)
+ public DatacenterQuorumSyncResponseHandler(Map<String, Integer>
responseCounts)
{
// Response is been managed by the map so make it 1 for the superclass.
- super(1, responseResolver);
+ super(1);
this.responseCounts = responseCounts;
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=881281&r1=881280&r2=881281&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
Tue Nov 17 13:34:06 2009
@@ -47,7 +47,7 @@
*/
public class ReadResponseResolver implements IResponseResolver<Row>
{
- private static Logger logger_ =
Logger.getLogger(WriteResponseResolver.class);
+ private static Logger logger_ =
Logger.getLogger(ReadResponseResolver.class);
/*
* This method for resolving read data should look at the timestamps of
each
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=881281&r1=881280&r2=881281&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
Tue Nov 17 13:34:06 2009
@@ -19,7 +19,6 @@
import java.io.IOError;
import java.io.IOException;
-import java.io.IOError;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -149,53 +148,85 @@
}
}
- public static void insertBlocking(RowMutation rm, int consistency_level)
throws UnavailableException
+ public static void insertBlocking(final RowMutation rm, int
consistency_level) throws UnavailableException
{
long startTime = System.currentTimeMillis();
- Message message;
- try
- {
- message = rm.makeRowMutationMessage();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
try
{
List<InetAddress> naturalEndpoints =
StorageService.instance().getNaturalEndpoints(rm.key());
Map<InetAddress, InetAddress> endpointMap =
StorageService.instance().getHintedEndpointMap(rm.key(), naturalEndpoints);
int blockFor = determineBlockFor(naturalEndpoints.size(),
endpointMap.size(), consistency_level);
- List<InetAddress> primaryNodes = getUnhintedNodes(endpointMap);
- if (primaryNodes.size() < blockFor) // guarantee blockFor = W live
nodes.
- {
- throw new UnavailableException();
- }
- QuorumResponseHandler<Boolean> quorumResponseHandler =
StorageService.instance().getResponseHandler(new WriteResponseResolver(),
blockFor, consistency_level);
- if (logger.isDebugEnabled())
- logger.debug("insertBlocking writing key " + rm.key() + " to "
+ message.getMessageId() + "@[" + StringUtils.join(endpointMap.values(), ", ")
+ "]");
- // Get all the targets and stick them in an array
- MessagingService.instance().sendRR(message,
primaryNodes.toArray(new InetAddress[primaryNodes.size()]),
quorumResponseHandler);
- try
+ // avoid starting a write we know can't achieve the required
consistency
+ int liveNodes = 0;
+ for (Map.Entry<InetAddress, InetAddress> entry :
endpointMap.entrySet())
{
- if (!quorumResponseHandler.get())
- throw new UnavailableException();
+ if (entry.getKey().equals(entry.getValue()))
+ {
+ liveNodes++;
+ }
}
- catch (DigestMismatchException e)
+ if (liveNodes < blockFor)
{
- throw new AssertionError(e);
+ throw new UnavailableException();
}
- if (primaryNodes.size() < endpointMap.size()) // Do we need to
bother with Hinted Handoff?
+
+ // send out the writes, as in insert() above, but this time with a
callback that tracks responses
+ final WriteResponseHandler responseHandler =
StorageService.instance().getWriteResponseHandler(blockFor, consistency_level);
+ Message unhintedMessage = null;
+ for (Map.Entry<InetAddress, InetAddress> entry :
endpointMap.entrySet())
{
- for (Map.Entry<InetAddress, InetAddress> e :
endpointMap.entrySet())
+ InetAddress target = entry.getKey();
+ InetAddress hintedTarget = entry.getValue();
+
+ if (target.equals(hintedTarget))
{
- if (!e.getKey().equals(e.getValue())) // Hinted Handoff to
target
+ if (target.equals(FBUtilities.getLocalAddress()))
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("insert writing local key " +
rm.key());
+ Runnable runnable = new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ rm.apply();
+ responseHandler.localResponse();
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
+ };
+
StageManager.getStage(StageManager.mutationStage_).execute(runnable);
+ }
+ else
{
- MessagingService.instance().sendOneWay(message,
e.getValue());
+ if (unhintedMessage == null)
+ {
+ unhintedMessage = rm.makeRowMutationMessage();
+
MessagingService.instance().addCallback(responseHandler,
unhintedMessage.getMessageId());
+ }
+ if (logger.isDebugEnabled())
+ logger.debug("insert writing key " + rm.key() + "
to " + unhintedMessage.getMessageId() + "@" + target);
+
MessagingService.instance().sendOneWay(unhintedMessage, target);
}
}
+ else
+ {
+ // (hints aren't part of the callback since they don't
count towards consistency until they are on the final destination node)
+ Message hintedMessage = rm.makeRowMutationMessage();
+ hintedMessage.addHeader(RowMutation.HINT,
target.getAddress());
+ if (logger.isDebugEnabled())
+ logger.debug("insert writing key " + rm.key() + " to "
+ hintedMessage.getMessageId() + "@" + hintedTarget + " for " + target);
+ MessagingService.instance().sendOneWay(hintedMessage,
hintedTarget);
+ }
}
+
+ // wait for writes. throws timeoutexception if necessary
+ responseHandler.get();
}
catch (TimeoutException e)
{
@@ -211,19 +242,6 @@
}
}
- private static List<InetAddress> getUnhintedNodes(Map<InetAddress,
InetAddress> endpointMap)
- {
- List<InetAddress> liveEndPoints = new
ArrayList<InetAddress>(endpointMap.size());
- for (Map.Entry<InetAddress, InetAddress> e : endpointMap.entrySet())
- {
- if (e.getKey().equals(e.getValue()))
- {
- liveEndPoints.add(e.getKey());
- }
- }
- return liveEndPoints;
- }
-
private static int determineBlockFor(int naturalTargets, int
hintedTargets, int consistency_level)
{
assert naturalTargets >= 1;
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=881281&r1=881280&r2=881281&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Tue Nov 17 13:34:06 2009
@@ -1037,9 +1037,9 @@
unbootstrap(finishMoving);
}
- public <T> QuorumResponseHandler<T>
getResponseHandler(IResponseResolver<T> responseResolver, int blockFor, int
consistency_level)
+ public WriteResponseHandler getWriteResponseHandler(int blockFor, int
consistency_level)
{
- return replicationStrategy_.getResponseHandler(responseResolver,
blockFor, consistency_level);
+ return replicationStrategy_.getWriteResponseHandler(blockFor,
consistency_level);
}
public AbstractReplicationStrategy getReplicationStrategy()
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java?rev=881281&view=auto
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java
(added)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java
Tue Nov 17 13:34:06 2009
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.io.IOException;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.SimpleCondition;
+
+import org.apache.log4j.Logger;
+
+public class WriteResponseHandler implements IAsyncCallback
+{
+ protected static final Logger logger = Logger.getLogger(
WriteResponseHandler.class );
+ protected final SimpleCondition condition = new SimpleCondition();
+ private final int responseCount;
+ protected final List<Message> responses;
+ protected int localResponses;
+ private final long startTime;
+
+ public WriteResponseHandler(int responseCount)
+ {
+ assert 1 <= responseCount && responseCount <=
DatabaseDescriptor.getReplicationFactor()
+ : "invalid response count " + responseCount;
+
+ this.responseCount = responseCount;
+ responses = new ArrayList<Message>(responseCount);
+ startTime = System.currentTimeMillis();
+ }
+
+ public void get() throws TimeoutException
+ {
+ try
+ {
+ long timeout = System.currentTimeMillis() - startTime +
DatabaseDescriptor.getRpcTimeout();
+ boolean success;
+ try
+ {
+ success = condition.await(timeout, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException ex)
+ {
+ throw new AssertionError(ex);
+ }
+
+ if (!success)
+ {
+ throw new TimeoutException("Operation timed out - received
only " + responses.size() + localResponses + " responses");
+ }
+ }
+ finally
+ {
+ for (Message response : responses)
+ {
+
MessagingService.removeRegisteredCallback(response.getMessageId());
+ }
+ }
+ }
+
+ public synchronized void response(Message message)
+ {
+ if (condition.isSignaled())
+ return;
+ responses.add(message);
+ maybeSignal();
+ }
+
+ public synchronized void localResponse()
+ {
+ if (condition.isSignaled())
+ return;
+ localResponses++;
+ maybeSignal();
+ }
+
+ private void maybeSignal()
+ {
+ if (responses.size() + localResponses >= responseCount)
+ {
+ condition.signal();
+ }
+ }
+}
\ No newline at end of file