Author: jbellis
Date: Thu May 12 20:33:46 2011
New Revision: 1102454
URL: http://svn.apache.org/viewvc?rev=1102454&view=rev
Log:
revert work-in-progress accidentally committed
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractRowResolver.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IResponseResolver.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java?rev=1102454&r1=1102453&r2=1102454&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
Thu May 12 20:33:46 2011
@@ -259,7 +259,6 @@ public final class MessagingService impl
public String sendRR(Message message, InetAddress to, IMessageCallback cb)
{
String id = nextId();
- logger_.debug("Message id to {} is {}", to, id);
addCallback(cb, id, to);
sendOneWay(message, id, to);
return id;
@@ -267,9 +266,7 @@ public final class MessagingService impl
public void sendOneWay(Message message, InetAddress to)
{
- String id = nextId();
- logger_.debug("Message id to {} is {}", to, id);
- sendOneWay(message, id, to);
+ sendOneWay(message, nextId(), to);
}
public void sendReply(Message message, String id, InetAddress to)
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractRowResolver.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractRowResolver.java?rev=1102454&r1=1102453&r2=1102454&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractRowResolver.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractRowResolver.java
Thu May 12 20:33:46 2011
@@ -55,15 +55,16 @@ public abstract class AbstractRowResolve
this.table = table;
}
- public ReadResponse preprocess(Message message)
+ public void preprocess(Message message)
{
byte[] body = message.getMessageBody();
ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
try
{
ReadResponse result = ReadResponse.serializer().deserialize(new
DataInputStream(bufIn));
+ if (logger.isDebugEnabled())
+ logger.debug("Preprocessed {} response",
result.isDigestQuery() ? "digest" : "data");
replies.put(message, result);
- return result;
}
catch (IOException e)
{
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java?rev=1102454&r1=1102453&r2=1102454&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
Thu May 12 20:33:46 2011
@@ -48,11 +48,17 @@ public class DatacenterReadCallback<T> e
}
@Override
- protected boolean waitingFor(ReadResponse response, InetAddress from)
+ protected boolean waitingFor(Message message)
+ {
+ return localdc.equals(snitch.getDatacenter(message.getFrom()));
+ }
+
+ @Override
+ protected boolean waitingFor(ReadResponse response)
{
// cheat and leverage our knowledge that a local read is the only way
the ReadResponse
// version of this method gets called
- return localdc.equals(snitch.getDatacenter(from));
+ return true;
}
@Override
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IResponseResolver.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IResponseResolver.java?rev=1102454&r1=1102453&r2=1102454&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IResponseResolver.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IResponseResolver.java
Thu May 12 20:33:46 2011
@@ -20,7 +20,6 @@ package org.apache.cassandra.service;
import java.io.IOException;
-import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.net.Message;
public interface IResponseResolver<T> {
@@ -42,6 +41,6 @@ public interface IResponseResolver<T> {
*/
public T getData() throws IOException;
- public ReadResponse preprocess(Message message);
+ public void preprocess(Message message);
public Iterable<Message> getMessages();
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java?rev=1102454&r1=1102453&r2=1102454&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
Thu May 12 20:33:46 2011
@@ -24,11 +24,16 @@ import java.util.*;
import java.util.concurrent.LinkedBlockingQueue;
import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
import org.apache.commons.collections.iterators.CollatingIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.RangeSliceReply;
+import org.apache.cassandra.db.Row;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.ReducingIterator;
@@ -109,7 +114,7 @@ public class RangeSliceResponseResolver
};
}
- public ReadResponse preprocess(Message message)
+ public void preprocess(Message message)
{
responses.add(message);
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java?rev=1102454&r1=1102453&r2=1102454&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java
Thu May 12 20:33:46 2011
@@ -42,7 +42,6 @@ import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.UnavailableException;
-import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.SimpleCondition;
import org.apache.cassandra.utils.WrappedRunnable;
@@ -128,13 +127,10 @@ public class ReadCallback<T> implements
public void response(Message message)
{
- ReadResponse result = resolver.preprocess(message);
- int n = waitingFor(result, message.getFrom())
+ resolver.preprocess(message);
+ int n = waitingFor(message)
? received.incrementAndGet()
: received.get();
- if (logger.isDebugEnabled())
- logger.debug("{} response; {} qualifying responses seen. Data is
{}present",
- new Object[] { result.isDigestQuery() ? "digest" :
"data", n, resolver.isDataPresent() ? "" : "not " });
if (n >= blockfor && resolver.isDataPresent())
{
condition.signal();
@@ -143,9 +139,18 @@ public class ReadCallback<T> implements
}
/**
+ * @return true if the message counts towards the blockfor threshold
+ * TODO turn the Message into a response so we don't need two versions of
this method
+ */
+ protected boolean waitingFor(Message message)
+ {
+ return true;
+ }
+
+ /**
* @return true if the response counts towards the blockfor threshold
*/
- protected boolean waitingFor(ReadResponse response, InetAddress from)
+ protected boolean waitingFor(ReadResponse response)
{
return true;
}
@@ -153,9 +158,7 @@ public class ReadCallback<T> implements
public void response(ReadResponse result)
{
((RowDigestResolver) resolver).injectPreProcessed(result);
- if (logger.isDebugEnabled())
- logger.debug("Preprocessed {} response", result.isDigestQuery() ?
"digest" : "data");
- int n = waitingFor(result, FBUtilities.getLocalAddress())
+ int n = waitingFor(result)
? received.incrementAndGet()
: received.get();
if (n >= blockfor && resolver.isDataPresent())