Author: jbellis
Date: Tue Jan 25 17:09:43 2011
New Revision: 1063361
URL: http://svn.apache.org/viewvc?rev=1063361&view=rev
Log:
fix bugs in multi-DC replication
patch by ivancso; reviewed by jbellis for CASSANDRA-2051
Modified:
cassandra/branches/cassandra-0.7/CHANGES.txt
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Header.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1063361&r1=1063360&r2=1063361&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Tue Jan 25 17:09:43 2011
@@ -2,7 +2,7 @@
* buffer network stack to avoid inefficient small TCP messages while avoiding
the nagle/delayed ack problem (CASSANDRA-1896)
* check log4j configuration for changes every 10s (CASSANDRA-1525, 1907)
- * more-efficient cross-DC replication (CASSANDRA-1530)
+ * more-efficient cross-DC replication (CASSANDRA-1530, -2051)
* upgrade to TFastFramedTransport (CASSANDRA-1743)
* avoid polluting page cache with commitlog or sstable writes
and seq scan operations (CASSANDRA-1470)
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=1063361&r1=1063360&r2=1063361&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
Tue Jan 25 17:09:43 2011
@@ -90,7 +90,7 @@ public class RowMutationVerbHandler impl
private void forwardToLocalNodes(Message message, byte[] forwardBytes)
throws UnknownHostException
{
// remove fwds from message to avoid infinite loop
- message.setHeader(RowMutation.FORWARD_HEADER, null);
+ message.removeHeader(RowMutation.FORWARD_HEADER);
int bytesPerInetAddress =
FBUtilities.getLocalAddress().getAddress().length;
assert forwardBytes.length >= bytesPerInetAddress;
@@ -110,7 +110,7 @@ public class RowMutationVerbHandler impl
// Send the original message to the address specified by the
FORWARD_HINT
// Let the response go back to the coordinator
- MessagingService.instance().sendOneWay(message, message.getFrom());
+ MessagingService.instance().sendOneWay(message, address);
offset += bytesPerInetAddress;
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Header.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Header.java?rev=1063361&r1=1063360&r2=1063361&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Header.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Header.java
Tue Jan 25 17:09:43 2011
@@ -97,6 +97,11 @@ public class Header
{
details_.put(key, value);
}
+
+ void removeDetail(String key)
+ {
+ details_.remove(key);
+ }
}
class HeaderSerializer implements ICompactSerializer<Header>
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java?rev=1063361&r1=1063360&r2=1063361&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java
Tue Jan 25 17:09:43 2011
@@ -68,6 +68,11 @@ public class Message
{
header_.setDetail(key, value);
}
+
+ public void removeHeader(String key)
+ {
+ header_.removeDetail(key);
+ }
public byte[] getMessageBody()
{
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java?rev=1063361&r1=1063360&r2=1063361&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
Tue Jan 25 17:09:43 2011
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.Atomi
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.NetworkTopologyStrategy;
import org.apache.cassandra.net.Message;
@@ -40,7 +41,7 @@ import org.apache.cassandra.utils.FBUtil
public class DatacenterReadCallback<T> extends ReadCallback<T>
{
private static final IEndpointSnitch snitch =
DatabaseDescriptor.getEndpointSnitch();
- private static final String localdc =
snitch.getDatacenter(FBUtilities.getLocalAddress());
+ private static final String localdc =
snitch.getDatacenter(FBUtilities.getLocalAddress());
private AtomicInteger localResponses;
public DatacenterReadCallback(IResponseResolver<T> resolver,
ConsistencyLevel consistencyLevel, String table)
@@ -54,8 +55,7 @@ public class DatacenterReadCallback<T> e
{
resolver.preprocess(message);
- int n;
- n = localdc.equals(snitch.getDatacenter(message.getFrom()))
+ int n = localdc.equals(snitch.getDatacenter(message.getFrom()))
? localResponses.decrementAndGet()
: localResponses.get();
@@ -66,6 +66,19 @@ public class DatacenterReadCallback<T> e
}
@Override
+ public void response(ReadResponse result)
+ {
+ ((ReadResponseResolver) resolver).injectPreProcessed(result);
+
+ int n = localResponses.decrementAndGet();
+
+ if (n == 0 && resolver.isDataPresent())
+ {
+ condition.signal();
+ }
+ }
+
+ @Override
public int determineBlockFor(ConsistencyLevel consistency_level, String
table)
{
NetworkTopologyStrategy stategy = (NetworkTopologyStrategy)
Table.open(table).getReplicationStrategy();
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1063361&r1=1063360&r2=1063361&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
Tue Jan 25 17:09:43 2011
@@ -234,10 +234,9 @@ public class StorageProxy implements Sto
Message message = messages.getKey();
Iterator<InetAddress> iter = messages.getValue().iterator();
assert iter.hasNext();
-
+
// First endpoint in list is the destination for this group
InetAddress target = iter.next();
-
// Add all the other destinations that are bound for the same
dataCenter as a header in the primary message.
while (iter.hasNext())
@@ -382,7 +381,7 @@ public class StorageProxy implements Sto
{
Message message = command.makeReadMessage();
if (logger.isDebugEnabled())
- logger.debug("reading digest for " + command + " from " +
message.getMessageId() + "@" + dataPoint);
+ logger.debug("reading data for " + command + " from " +
message.getMessageId() + "@" + dataPoint);
MessagingService.instance().sendRR(message, dataPoint,
handler);
}