Author: jbellis
Date: Fri Dec 31 16:52:56 2010
New Revision: 1054135
URL: http://svn.apache.org/viewvc?rev=1054135&view=rev
Log:
encapsulate MessagingService.instance to avoid circular initializtion
dependencies
Modified:
cassandra/branches/cassandra-0.7/contrib/bmt_example/CassandraBulkLoader.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/BinaryVerbHandler.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ReadVerbHandler.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/dht/BootStrapper.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AntiEntropyService.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ConsistencyChecker.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/GCInspector.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/MigrationManager.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamIn.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamInSession.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamOutSession.java
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RemoveTest.java
Modified:
cassandra/branches/cassandra-0.7/contrib/bmt_example/CassandraBulkLoader.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/bmt_example/CassandraBulkLoader.java?rev=1054135&r1=1054134&r2=1054135&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/contrib/bmt_example/CassandraBulkLoader.java
(original)
+++
cassandra/branches/cassandra-0.7/contrib/bmt_example/CassandraBulkLoader.java
Fri Dec 31 16:52:56 2010
@@ -73,7 +73,6 @@ import org.apache.cassandra.net.IAsyncRe
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
@@ -189,7 +188,7 @@ public class CassandraBulkLoader {
for (InetAddress endpoint:
StorageService.instance.getNaturalEndpoints(keyspace,
ByteBuffer.wrap(key.getBytes())))
{
/* Send message to end point */
- results.add(MessagingService.instance.sendRR(message,
endpoint));
+ results.add(MessagingService.instance().sendRR(message,
endpoint));
}
/* wait for acks */
for (IAsyncResult result : results)
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/BinaryVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/BinaryVerbHandler.java?rev=1054135&r1=1054134&r2=1054135&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/BinaryVerbHandler.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/BinaryVerbHandler.java
Fri Dec 31 16:52:56 2010
@@ -47,7 +47,7 @@ public class BinaryVerbHandler implement
Message responseMessage =
WriteResponse.makeWriteResponseMessage(message, response);
if (logger_.isDebugEnabled())
logger_.debug("binary " + rm + " applied. Sending response to "
+ message.getMessageId() + "@" + message.getFrom());
- MessagingService.instance.sendOneWay(responseMessage,
message.getFrom());
+ MessagingService.instance().sendOneWay(responseMessage,
message.getFrom());
}
catch (Exception e)
{
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1054135&r1=1054134&r2=1054135&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java
Fri Dec 31 16:52:56 2010
@@ -124,7 +124,7 @@ public class HintedHandOffManager
rm.add(cf);
Message message = rm.makeRowMutationMessage();
IWriteResponseHandler responseHandler =
WriteResponseHandler.create(endpoint);
- MessagingService.instance.sendRR(message, Arrays.asList(endpoint),
responseHandler);
+ MessagingService.instance().sendRR(message,
Arrays.asList(endpoint), responseHandler);
try
{
responseHandler.get();
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=1054135&r1=1054134&r2=1054135&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ReadVerbHandler.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ReadVerbHandler.java
Fri Dec 31 16:52:56 2010
@@ -89,7 +89,7 @@ public class ReadVerbHandler implements
if (logger_.isDebugEnabled())
logger_.debug(String.format("Read key %s; sending response to
%...@%s",
FBUtilities.bytesToHex(command.key),
message.getMessageId(), message.getFrom()));
- MessagingService.instance.sendOneWay(response, message.getFrom());
+ MessagingService.instance().sendOneWay(response,
message.getFrom());
}
catch (IOException ex)
{
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=1054135&r1=1054134&r2=1054135&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
Fri Dec 31 16:52:56 2010
@@ -79,7 +79,7 @@ public class RowMutationVerbHandler impl
Message responseMessage =
WriteResponse.makeWriteResponseMessage(message, response);
if (logger_.isDebugEnabled())
logger_.debug(rm + " applied. Sending response to " +
message.getMessageId() + "@" + message.getFrom());
- MessagingService.instance.sendOneWay(responseMessage,
message.getFrom());
+ MessagingService.instance().sendOneWay(responseMessage,
message.getFrom());
}
catch (IOException e)
{
@@ -110,7 +110,7 @@ public class RowMutationVerbHandler impl
// Send the original message to the address specified by the
FORWARD_HINT
// Let the response go back to the coordinator
- MessagingService.instance.sendOneWay(message, message.getFrom());
+ MessagingService.instance().sendOneWay(message, message.getFrom());
offset += bytesPerInetAddress;
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java?rev=1054135&r1=1054134&r2=1054135&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
Fri Dec 31 16:52:56 2010
@@ -35,6 +35,6 @@ public class SchemaCheckVerbHandler impl
{
logger.debug("Received schema check request.");
Message response =
message.getInternalReply(DatabaseDescriptor.getDefsVersion().toString().getBytes());
- MessagingService.instance.sendOneWay(response, message.getFrom());
+ MessagingService.instance().sendOneWay(response, message.getFrom());
}
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/TruncateVerbHandler.java?rev=1054135&r1=1054134&r2=1054135&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
Fri Dec 31 16:52:56 2010
@@ -78,7 +78,7 @@ public class TruncateVerbHandler impleme
Message responseMessage =
TruncateResponse.makeTruncateResponseMessage(message, response);
logger.debug("{} applied. Sending response to {...@{} ",
new Object[]{t, message.getMessageId(),
message.getFrom()});
- MessagingService.instance.sendOneWay(responseMessage,
message.getFrom());
+ MessagingService.instance().sendOneWay(responseMessage,
message.getFrom());
}
catch (IOException e)
{
@@ -101,6 +101,6 @@ public class TruncateVerbHandler impleme
{
TruncateResponse response = new TruncateResponse(t.keyspace,
t.columnFamily, false);
Message responseMessage =
TruncateResponse.makeTruncateResponseMessage(truncateRequestMessage, response);
- MessagingService.instance.sendOneWay(responseMessage,
truncateRequestMessage.getFrom());
+ MessagingService.instance().sendOneWay(responseMessage,
truncateRequestMessage.getFrom());
}
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/dht/BootStrapper.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=1054135&r1=1054134&r2=1054135&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/dht/BootStrapper.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/dht/BootStrapper.java
Fri Dec 31 16:52:56 2010
@@ -216,7 +216,7 @@ public class BootStrapper
{
Message message = new Message(FBUtilities.getLocalAddress(),
StorageService.Verb.BOOTSTRAP_TOKEN, ArrayUtils.EMPTY_BYTE_ARRAY);
BootstrapTokenCallback btc = new BootstrapTokenCallback();
- MessagingService.instance.sendRR(message, maxEndpoint, btc);
+ MessagingService.instance().sendRR(message, maxEndpoint, btc);
return btc.getToken();
}
@@ -255,7 +255,7 @@ public class BootStrapper
StorageService ss = StorageService.instance;
String tokenString =
StorageService.getPartitioner().getTokenFactory().toString(ss.getBootstrapToken());
Message response =
message.getInternalReply(tokenString.getBytes(Charsets.UTF_8));
- MessagingService.instance.sendOneWay(response, message.getFrom());
+ MessagingService.instance().sendOneWay(response,
message.getFrom());
}
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java?rev=1054135&r1=1054134&r2=1054135&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
Fri Dec 31 16:52:56 2010
@@ -76,7 +76,7 @@ public class GossipDigestAckVerbHandler
Message gDigestAck2Message =
Gossiper.instance.makeGossipDigestAck2Message(gDigestAck2);
if (logger_.isTraceEnabled())
logger_.trace("Sending a GossipDigestAck2Message to {}", from);
- MessagingService.instance.sendOneWay(gDigestAck2Message, from);
+ MessagingService.instance().sendOneWay(gDigestAck2Message, from);
}
catch ( IOException e )
{
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java?rev=1054135&r1=1054134&r2=1054135&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
Fri Dec 31 16:52:56 2010
@@ -72,7 +72,7 @@ public class GossipDigestSynVerbHandler
Message gDigestAckMessage =
Gossiper.instance.makeGossipDigestAckMessage(gDigestAck);
if (logger_.isTraceEnabled())
logger_.trace("Sending a GossipDigestAckMessage to {}", from);
- MessagingService.instance.sendOneWay(gDigestAckMessage, from);
+ MessagingService.instance().sendOneWay(gDigestAckMessage, from);
}
catch (IOException e)
{
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1054135&r1=1054134&r2=1054135&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java
Fri Dec 31 16:52:56 2010
@@ -57,7 +57,7 @@ public class Gossiper implements IFailur
try
{
//wait on messaging service to start listening
- MessagingService.instance.waitUntilListening();
+ MessagingService.instance().waitUntilListening();
/* Update the local heartbeat counter. */
endpointStateMap_.get(localEndpoint_).getHeartBeatState().updateHeartBeat();
@@ -326,7 +326,7 @@ public class Gossiper implements IFailur
InetAddress to = liveEndpoints.get(index);
if (logger_.isTraceEnabled())
logger_.trace("Sending a GossipDigestSynMessage to {} ...", to);
- MessagingService.instance.sendOneWay(message, to);
+ MessagingService.instance().sendOneWay(message, to);
return seeds_.contains(to);
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java?rev=1054135&r1=1054134&r2=1054135&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
Fri Dec 31 16:52:56 2010
@@ -181,7 +181,7 @@ public class DynamicEndpointSnitch exten
return;
if (!registered)
{
- ILatencyPublisher handler =
(ILatencyPublisher)MessagingService.instance.getVerbHandler(StorageService.Verb.REQUEST_RESPONSE);
+ ILatencyPublisher handler = (ILatencyPublisher)
MessagingService.instance().getVerbHandler(StorageService.Verb.REQUEST_RESPONSE);
if (handler != null)
{
handler.register(this);
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessageDeliveryTask.java?rev=1054135&r1=1054134&r2=1054135&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
Fri Dec 31 16:52:56 2010
@@ -58,7 +58,7 @@ public class MessageDeliveryTask impleme
break;
}
- IVerbHandler verbHandler =
MessagingService.instance.getVerbHandler(verb);
+ IVerbHandler verbHandler =
MessagingService.instance().getVerbHandler(verb);
assert verbHandler != null : "unknown verb " + verb;
verbHandler.doVerb(message_);
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java?rev=1054135&r1=1054134&r2=1054135&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
Fri Dec 31 16:52:56 2010
@@ -83,8 +83,6 @@ public class MessagingService implements
private static Logger logger_ =
LoggerFactory.getLogger(MessagingService.class);
private static int LOG_DROPPED_INTERVAL_IN_MS = 5000;
- public static final MessagingService instance = new MessagingService();
-
private SocketThread socketThread;
private SimpleCondition listenGate;
private static final Map<StorageService.Verb, AtomicInteger>
droppedMessages = new EnumMap<StorageService.Verb,
AtomicInteger>(StorageService.Verb.class);
@@ -96,6 +94,15 @@ public class MessagingService implements
droppedMessages.put(verb, new AtomicInteger());
}
+ private static class MSHandle
+ {
+ public static final MessagingService instance = new MessagingService();
+ }
+ public static MessagingService instance()
+ {
+ return MSHandle.instance;
+ }
+
public Object clone() throws CloneNotSupportedException
{
//Prevents the singleton from being cloned
@@ -390,7 +397,7 @@ public class MessagingService implements
try
{
- instance.socketThread.close();
+ instance().socketThread.close();
}
catch (IOException e)
{
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java?rev=1054135&r1=1054134&r2=1054135&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
Fri Dec 31 16:52:56 2010
@@ -74,7 +74,7 @@ public abstract class AbstractWriteRespo
{
// (non-destination hints are part of the callback and count towards
consistency only under CL.ANY)
if (writeEndpoints.contains(destination) || consistencyLevel ==
ConsistencyLevel.ANY)
- MessagingService.instance.addCallback(this,
hintedMessage.getMessageId());
+ MessagingService.instance().addCallback(this,
hintedMessage.getMessageId());
}
/** null message means "response from local write" */
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=1054135&r1=1054134&r2=1054135&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AntiEntropyService.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AntiEntropyService.java
Fri Dec 31 16:52:56 2010
@@ -228,7 +228,7 @@ public class AntiEntropyService
TreeRequest request(String sessionid, InetAddress remote, String ksname,
String cfname)
{
TreeRequest request = new TreeRequest(sessionid, remote, new
CFPair(ksname, cfname));
-
MessagingService.instance.sendOneWay(TreeRequestVerbHandler.makeVerb(request),
remote);
+
MessagingService.instance().sendOneWay(TreeRequestVerbHandler.makeVerb(request),
remote);
return request;
}
@@ -239,7 +239,7 @@ public class AntiEntropyService
*/
void respond(Validator validator, InetAddress local)
{
- MessagingService ms = MessagingService.instance;
+ MessagingService ms = MessagingService.instance();
try
{
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ConsistencyChecker.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ConsistencyChecker.java?rev=1054135&r1=1054134&r2=1054135&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ConsistencyChecker.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ConsistencyChecker.java
Fri Dec 31 16:52:56 2010
@@ -85,11 +85,11 @@ class ConsistencyChecker implements Runn
if (logger_.isDebugEnabled())
logger_.debug("Reading consistency digest for " +
readCommand_.key + " from " + message.getMessageId() + "@[" +
StringUtils.join(replicas_, ", ") + "]");
- MessagingService.instance.addCallback(new DigestResponseHandler(),
message.getMessageId());
+ MessagingService.instance().addCallback(new
DigestResponseHandler(), message.getMessageId());
for (InetAddress endpoint : replicas_)
{
if (!endpoint.equals(dataSource))
- MessagingService.instance.sendOneWay(message, endpoint);
+ MessagingService.instance().sendOneWay(message, endpoint);
}
}
catch (IOException ex)
@@ -128,11 +128,11 @@ class ConsistencyChecker implements Runn
Message message = readCommand.makeReadMessage();
if (logger_.isDebugEnabled())
logger_.debug("Digest mismatch; re-reading " +
readCommand_.key + " from " + message.getMessageId() + "@[" +
StringUtils.join(replicas_, ", ") + "]");
- MessagingService.instance.addCallback(new
DataRepairHandler(), message.getMessageId());
+ MessagingService.instance().addCallback(new
DataRepairHandler(), message.getMessageId());
for (InetAddress endpoint : replicas_)
{
if (!endpoint.equals(dataSource))
- MessagingService.instance.sendOneWay(message,
endpoint);
+ MessagingService.instance().sendOneWay(message,
endpoint);
}
repairInvoked = true;
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/GCInspector.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/GCInspector.java?rev=1054135&r1=1054134&r2=1054135&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/GCInspector.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/GCInspector.java
Fri Dec 31 16:52:56 2010
@@ -165,12 +165,12 @@ public class GCInspector
logger.info(String.format("%-25s%10s%10s",
"CompactionManager", "n/a",
CompactionManager.instance.getPendingTasks()));
int pendingCommands = 0;
- for (int n :
MessagingService.instance.getCommandPendingTasks().values())
+ for (int n :
MessagingService.instance().getCommandPendingTasks().values())
{
pendingCommands += n;
}
int pendingResponses = 0;
- for (int n :
MessagingService.instance.getResponsePendingTasks().values())
+ for (int n :
MessagingService.instance().getResponsePendingTasks().values())
{
pendingResponses += n;
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java?rev=1054135&r1=1054134&r2=1054135&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
Fri Dec 31 16:52:56 2010
@@ -44,7 +44,7 @@ public class IndexScanVerbHandler implem
Message response = reply.getReply(message);
if (logger.isDebugEnabled())
logger.debug("Sending " + reply+ " to " +
message.getMessageId() + "@" + message.getFrom());
- MessagingService.instance.sendOneWay(response, message.getFrom());
+ MessagingService.instance().sendOneWay(response,
message.getFrom());
}
catch (Exception ex)
{
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/MigrationManager.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/MigrationManager.java?rev=1054135&r1=1054134&r2=1054135&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/MigrationManager.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/MigrationManager.java
Fri Dec 31 16:52:56 2010
@@ -93,7 +93,7 @@ public class MigrationManager implements
{
Message msg = makeVersionMessage(version);
for (InetAddress host : hosts)
- MessagingService.instance.sendOneWay(msg, host);
+ MessagingService.instance().sendOneWay(msg, host);
// this is for notifying nodes as they arrive in the cluster.
if (!StorageService.instance.isClientMode())
Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA,
StorageService.valueFactory.migration(version));
@@ -161,7 +161,7 @@ public class MigrationManager implements
try
{
Message msg = makeMigrationMessage(migrations);
- MessagingService.instance.sendOneWay(msg, host);
+ MessagingService.instance().sendOneWay(msg, host);
}
catch (IOException ex)
{
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java?rev=1054135&r1=1054134&r2=1054135&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
Fri Dec 31 16:52:56 2010
@@ -53,7 +53,7 @@ public class RangeSliceVerbHandler imple
Message response = reply.getReply(message);
if (logger.isDebugEnabled())
logger.debug("Sending " + reply+ " to " +
message.getMessageId() + "@" + message.getFrom());
- MessagingService.instance.sendOneWay(response, message.getFrom());
+ MessagingService.instance().sendOneWay(response,
message.getFrom());
}
catch (Exception ex)
{
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=1054135&r1=1054134&r2=1054135&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java
Fri Dec 31 16:52:56 2010
@@ -139,7 +139,7 @@ public class ReadResponseResolver implem
{
throw new IOError(e);
}
- MessagingService.instance.sendOneWay(repairMessage,
endpoints.get(i));
+ MessagingService.instance().sendOneWay(repairMessage,
endpoints.get(i));
}
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=1054135&r1=1054134&r2=1054135&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
Fri Dec 31 16:52:56 2010
@@ -149,7 +149,7 @@ public class StorageLoadBalancer impleme
public void doVerb(Message message)
{
Message reply = message.getInternalReply(new byte[]
{(byte)(isMoveable_.get() ? 1 : 0)});
- MessagingService.instance.sendOneWay(reply, message.getFrom());
+ MessagingService.instance().sendOneWay(reply, message.getFrom());
if ( isMoveable_.get() )
{
// MoveMessage moveMessage =
(MoveMessage)message.getMessageBody()[0];
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1054135&r1=1054134&r2=1054135&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
Fri Dec 31 16:52:56 2010
@@ -146,7 +146,7 @@ public class StorageProxy implements Sto
if (unhintedMessage == null)
{
unhintedMessage = rm.makeRowMutationMessage();
-
MessagingService.instance.addCallback(responseHandler,
unhintedMessage.getMessageId());
+
MessagingService.instance().addCallback(responseHandler,
unhintedMessage.getMessageId());
}
if (logger.isDebugEnabled())
logger.debug("insert writing key " +
FBUtilities.bytesToHex(rm.key()) + " to " + unhintedMessage.getMessageId() +
"@" + destination);
@@ -222,7 +222,7 @@ public class StorageProxy implements Sto
{
// direct write to local DC
assert
primaryMessage.getHeader(RowMutation.FORWARD_HEADER) == null;
- MessagingService.instance.sendOneWay(primaryMessage,
target);
+ MessagingService.instance().sendOneWay(primaryMessage,
target);
}
else
{
@@ -240,7 +240,7 @@ public class StorageProxy implements Sto
}
}
- MessagingService.instance.sendOneWay(primaryMessage, target);
+ MessagingService.instance().sendOneWay(primaryMessage, target);
}
}
@@ -329,7 +329,7 @@ public class StorageProxy implements Sto
Message message = command.makeReadMessage();
if (logger.isDebugEnabled())
logger.debug("weakread reading " + command + " from " +
message.getMessageId() + "@" + endPoint);
- remoteResults.put(command,
MessagingService.instance.sendRR(message, endPoint));
+ remoteResults.put(command,
MessagingService.instance().sendRR(message, endPoint));
}
}
@@ -416,7 +416,7 @@ public class StorageProxy implements Sto
if (logger.isDebugEnabled())
logger.debug("strongread reading " + (m == message ?
"data" : "digest") + " for " + command + " from " + m.getMessageId() + "@" +
endpoint);
}
- MessagingService.instance.sendRR(messages, endpoints, handler);
+ MessagingService.instance().sendRR(messages, endpoints, handler);
quorumResponseHandlers.add(handler);
commandEndpoints.add(endpoints);
}
@@ -446,7 +446,7 @@ public class StorageProxy implements Sto
if (logger.isDebugEnabled())
logger.debug("Digest mismatch:", ex);
Message messageRepair = command.makeReadMessage();
- MessagingService.instance.sendRR(messageRepair,
commandEndpoints.get(i), handler);
+ MessagingService.instance().sendRR(messageRepair,
commandEndpoints.get(i), handler);
if (repairResponseHandlers == null)
repairResponseHandlers = new
ArrayList<QuorumResponseHandler<Row>>();
repairResponseHandlers.add(handler);
@@ -528,7 +528,7 @@ public class StorageProxy implements Sto
// TODO bail early if live endpoints can't satisfy
requested consistency level
for (InetAddress endpoint : liveEndpoints)
{
- MessagingService.instance.sendRR(message, endpoint,
handler);
+ MessagingService.instance().sendRR(message, endpoint,
handler);
if (logger.isDebugEnabled())
logger.debug("reading " + c2 + " from " +
message.getMessageId() + "@" + endpoint);
}
@@ -576,7 +576,7 @@ public class StorageProxy implements Sto
final Message msg = new Message(FBUtilities.getLocalAddress(),
StorageService.Verb.SCHEMA_CHECK, ArrayUtils.EMPTY_BYTE_ARRAY);
final CountDownLatch latch = new CountDownLatch(liveHosts.size());
// an empty message acts as a request to the SchemaCheckVerbHandler.
- MessagingService.instance.sendRR(msg, liveHosts, new IAsyncCallback()
+ MessagingService.instance().sendRR(msg, liveHosts, new IAsyncCallback()
{
public void response(Message msg)
{
@@ -783,7 +783,7 @@ public class StorageProxy implements Sto
Message message = command.getMessage();
for (InetAddress endpoint : liveEndpoints)
{
- MessagingService.instance.sendRR(message, endpoint, handler);
+ MessagingService.instance().sendRR(message, endpoint, handler);
if (logger.isDebugEnabled())
logger.debug("reading " + command + " from " +
message.getMessageId() + "@" + endpoint);
}
@@ -879,7 +879,7 @@ public class StorageProxy implements Sto
logger.debug("Starting to send truncate messages to hosts {}",
allEndpoints);
Truncation truncation = new Truncation(keyspace, cfname);
Message message = truncation.makeTruncationMessage();
- MessagingService.instance.sendRR(message, allEndpoints,
responseHandler);
+ MessagingService.instance().sendRR(message, allEndpoints,
responseHandler);
// Wait for all
logger.debug("Sent all truncate messages, now waiting for {}
responses", blockFor);
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java?rev=1054135&r1=1054134&r2=1054135&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
Fri Dec 31 16:52:56 2010
@@ -217,30 +217,30 @@ public class StorageService implements I
}
/* register the verb handlers */
- MessagingService.instance.registerVerbHandlers(Verb.BINARY, new
BinaryVerbHandler());
- MessagingService.instance.registerVerbHandlers(Verb.MUTATION, new
RowMutationVerbHandler());
- MessagingService.instance.registerVerbHandlers(Verb.READ_REPAIR, new
ReadRepairVerbHandler());
- MessagingService.instance.registerVerbHandlers(Verb.READ, new
ReadVerbHandler());
- MessagingService.instance.registerVerbHandlers(Verb.RANGE_SLICE, new
RangeSliceVerbHandler());
- MessagingService.instance.registerVerbHandlers(Verb.INDEX_SCAN, new
IndexScanVerbHandler());
+ MessagingService.instance().registerVerbHandlers(Verb.BINARY, new
BinaryVerbHandler());
+ MessagingService.instance().registerVerbHandlers(Verb.MUTATION, new
RowMutationVerbHandler());
+ MessagingService.instance().registerVerbHandlers(Verb.READ_REPAIR, new
ReadRepairVerbHandler());
+ MessagingService.instance().registerVerbHandlers(Verb.READ, new
ReadVerbHandler());
+ MessagingService.instance().registerVerbHandlers(Verb.RANGE_SLICE, new
RangeSliceVerbHandler());
+ MessagingService.instance().registerVerbHandlers(Verb.INDEX_SCAN, new
IndexScanVerbHandler());
// see BootStrapper for a summary of how the bootstrap verbs interact
- MessagingService.instance.registerVerbHandlers(Verb.BOOTSTRAP_TOKEN,
new BootStrapper.BootstrapTokenVerbHandler());
- MessagingService.instance.registerVerbHandlers(Verb.STREAM_REQUEST,
new StreamRequestVerbHandler());
- MessagingService.instance.registerVerbHandlers(Verb.STREAM_REPLY, new
StreamReplyVerbHandler());
-
MessagingService.instance.registerVerbHandlers(Verb.REPLICATION_FINISHED, new
ReplicationFinishedVerbHandler());
- MessagingService.instance.registerVerbHandlers(Verb.REQUEST_RESPONSE,
new ResponseVerbHandler());
- MessagingService.instance.registerVerbHandlers(Verb.INTERNAL_RESPONSE,
new ResponseVerbHandler());
- MessagingService.instance.registerVerbHandlers(Verb.TREE_REQUEST, new
TreeRequestVerbHandler());
- MessagingService.instance.registerVerbHandlers(Verb.TREE_RESPONSE, new
AntiEntropyService.TreeResponseVerbHandler());
-
- MessagingService.instance.registerVerbHandlers(Verb.GOSSIP_DIGEST_SYN,
new GossipDigestSynVerbHandler());
- MessagingService.instance.registerVerbHandlers(Verb.GOSSIP_DIGEST_ACK,
new GossipDigestAckVerbHandler());
-
MessagingService.instance.registerVerbHandlers(Verb.GOSSIP_DIGEST_ACK2, new
GossipDigestAck2VerbHandler());
+ MessagingService.instance().registerVerbHandlers(Verb.BOOTSTRAP_TOKEN,
new BootStrapper.BootstrapTokenVerbHandler());
+ MessagingService.instance().registerVerbHandlers(Verb.STREAM_REQUEST,
new StreamRequestVerbHandler());
+ MessagingService.instance().registerVerbHandlers(Verb.STREAM_REPLY,
new StreamReplyVerbHandler());
+
MessagingService.instance().registerVerbHandlers(Verb.REPLICATION_FINISHED, new
ReplicationFinishedVerbHandler());
+
MessagingService.instance().registerVerbHandlers(Verb.REQUEST_RESPONSE, new
ResponseVerbHandler());
+
MessagingService.instance().registerVerbHandlers(Verb.INTERNAL_RESPONSE, new
ResponseVerbHandler());
+ MessagingService.instance().registerVerbHandlers(Verb.TREE_REQUEST,
new TreeRequestVerbHandler());
+ MessagingService.instance().registerVerbHandlers(Verb.TREE_RESPONSE,
new AntiEntropyService.TreeResponseVerbHandler());
+
+
MessagingService.instance().registerVerbHandlers(Verb.GOSSIP_DIGEST_SYN, new
GossipDigestSynVerbHandler());
+
MessagingService.instance().registerVerbHandlers(Verb.GOSSIP_DIGEST_ACK, new
GossipDigestAckVerbHandler());
+
MessagingService.instance().registerVerbHandlers(Verb.GOSSIP_DIGEST_ACK2, new
GossipDigestAck2VerbHandler());
-
MessagingService.instance.registerVerbHandlers(Verb.DEFINITIONS_ANNOUNCE, new
DefinitionsAnnounceVerbHandler());
-
MessagingService.instance.registerVerbHandlers(Verb.DEFINITIONS_UPDATE_RESPONSE,
new DefinitionsUpdateResponseVerbHandler());
- MessagingService.instance.registerVerbHandlers(Verb.TRUNCATE, new
TruncateVerbHandler());
- MessagingService.instance.registerVerbHandlers(Verb.SCHEMA_CHECK, new
SchemaCheckVerbHandler());
+
MessagingService.instance().registerVerbHandlers(Verb.DEFINITIONS_ANNOUNCE, new
DefinitionsAnnounceVerbHandler());
+
MessagingService.instance().registerVerbHandlers(Verb.DEFINITIONS_UPDATE_RESPONSE,
new DefinitionsUpdateResponseVerbHandler());
+ MessagingService.instance().registerVerbHandlers(Verb.TRUNCATE, new
TruncateVerbHandler());
+ MessagingService.instance().registerVerbHandlers(Verb.SCHEMA_CHECK,
new SchemaCheckVerbHandler());
// spin up the streaming serivice so it is available for jmx tools.
if (StreamingService.instance == null)
@@ -275,7 +275,7 @@ public class StorageService implements I
setMode("Client", false);
Gossiper.instance.register(this);
Gossiper.instance.start(FBUtilities.getLocalAddress(),
(int)(System.currentTimeMillis() / 1000)); // needed for node-ring gathering.
- MessagingService.instance.listen(FBUtilities.getLocalAddress());
+ MessagingService.instance().listen(FBUtilities.getLocalAddress());
// sleep a while to allow gossip to warm up (the other nodes need to
know about this one before they can reply).
try
@@ -331,7 +331,7 @@ public class StorageService implements I
Gossiper.instance.register(migrationManager);
Gossiper.instance.start(FBUtilities.getLocalAddress(),
SystemTable.incrementAndGetGeneration()); // needed for node-ring gathering.
- MessagingService.instance.listen(FBUtilities.getLocalAddress());
+ MessagingService.instance().listen(FBUtilities.getLocalAddress());
StorageLoadBalancer.instance.startBroadcasting();
MigrationManager.announce(DatabaseDescriptor.getDefsVersion(),
DatabaseDescriptor.getSeeds());
@@ -878,7 +878,7 @@ public class StorageService implements I
IFailureDetector failureDetector = FailureDetector.instance;
while (failureDetector.isAlive(remote))
{
- IAsyncResult iar = MessagingService.instance.sendRR(msg, remote);
+ IAsyncResult iar = MessagingService.instance().sendRR(msg, remote);
try
{
iar.get(DatabaseDescriptor.getRpcTimeout(),
TimeUnit.MILLISECONDS);
@@ -1016,7 +1016,7 @@ public class StorageService implements I
public void onDead(InetAddress endpoint, EndpointState state)
{
- MessagingService.instance.convict(endpoint);
+ MessagingService.instance().convict(endpoint);
}
/** raw load value */
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java?rev=1054135&r1=1054134&r2=1054135&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java
Fri Dec 31 16:52:56 2010
@@ -39,6 +39,6 @@ public class ReplicationFinishedVerbHand
Message response = msg.getInternalReply(ArrayUtils.EMPTY_BYTE_ARRAY);
if (logger.isDebugEnabled())
logger.debug("Replying to " + msg.getMessageId() + "@" +
msg.getFrom());
- MessagingService.instance.sendOneWay(response, msg.getFrom());
+ MessagingService.instance().sendOneWay(response, msg.getFrom());
}
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamIn.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamIn.java?rev=1054135&r1=1054134&r2=1054135&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamIn.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamIn.java
Fri Dec 31 16:52:56 2010
@@ -62,7 +62,7 @@ public class StreamIn
logger.debug("Requesting from {} ranges {}", source,
StringUtils.join(ranges, ", "));
StreamInSession session = StreamInSession.create(source, callback);
Message message = new
StreamRequestMessage(FBUtilities.getLocalAddress(), ranges, tableName,
session.getSessionId()).makeMessage();
- MessagingService.instance.sendOneWay(message, source);
+ MessagingService.instance().sendOneWay(message, source);
}
/** Translates remote files to local files by creating a local sstable per
remote sstable. */
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamInSession.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamInSession.java?rev=1054135&r1=1054134&r2=1054135&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamInSession.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamInSession.java
Fri Dec 31 16:52:56 2010
@@ -115,14 +115,14 @@ public class StreamInSession
current = null;
StreamReply reply = new StreamReply(remoteFile.getFilename(),
getSessionId(), StreamReply.Status.FILE_FINISHED);
// send a StreamStatus message telling the source node it can delete
this file
- MessagingService.instance.sendOneWay(reply.createMessage(), getHost());
+ MessagingService.instance().sendOneWay(reply.createMessage(),
getHost());
}
public void retry(PendingFile remoteFile) throws IOException
{
StreamReply reply = new StreamReply(remoteFile.getFilename(),
getSessionId(), StreamReply.Status.FILE_RETRY);
logger.info("Streaming of file {} from {} failed: requesting a
retry.", remoteFile, this);
- MessagingService.instance.sendOneWay(reply.createMessage(), getHost());
+ MessagingService.instance().sendOneWay(reply.createMessage(),
getHost());
}
public void closeIfFinished() throws IOException
@@ -158,7 +158,7 @@ public class StreamInSession
// send reply to source that we're done
StreamReply reply = new StreamReply("", getSessionId(),
StreamReply.Status.SESSION_FINISHED);
logger.info("Finished streaming session {} from {}",
getSessionId(), getHost());
- MessagingService.instance.sendOneWay(reply.createMessage(),
getHost());
+ MessagingService.instance().sendOneWay(reply.createMessage(),
getHost());
if (callback != null)
callback.run();
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamOutSession.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamOutSession.java?rev=1054135&r1=1054134&r2=1054135&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamOutSession.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamOutSession.java
Fri Dec 31 16:52:56 2010
@@ -108,7 +108,7 @@ public class StreamOutSession
if (logger.isDebugEnabled())
logger.debug("Streaming {} ...", pf);
currentFile = pf.getFilename();
- MessagingService.instance.stream(new StreamHeader(table,
getSessionId(), pf), getHost());
+ MessagingService.instance().stream(new StreamHeader(table,
getSessionId(), pf), getHost());
}
public void startNext() throws IOException
@@ -173,6 +173,6 @@ public class StreamOutSession
StreamHeader header = new StreamHeader(table, getSessionId(), first,
files.values());
logger.info("Streaming to {}", getHost());
logger.debug("Files are {}", StringUtils.join(files.values(), ","));
- MessagingService.instance.stream(header, getHost());
+ MessagingService.instance().stream(header, getHost());
}
}
Modified:
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RemoveTest.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RemoveTest.java?rev=1054135&r1=1054134&r2=1054135&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RemoveTest.java
(original)
+++
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RemoveTest.java
Fri Dec 31 16:52:56 2010
@@ -72,7 +72,7 @@ public class RemoveTest extends CleanupH
// create a ring of 5 nodes
Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens,
hosts, 6);
- MessagingService.instance.listen(FBUtilities.getLocalAddress());
+ MessagingService.instance().listen(FBUtilities.getLocalAddress());
Gossiper.instance.start(FBUtilities.getLocalAddress(), 1);
for (int i = 0; i < 6; i++)
{
@@ -142,7 +142,7 @@ public class RemoveTest extends CleanupH
for (InetAddress host : hosts)
{
Message msg = new Message(host,
StorageService.Verb.REPLICATION_FINISHED, new byte[0]);
- MessagingService.instance.sendRR(msg,
FBUtilities.getLocalAddress());
+ MessagingService.instance().sendRR(msg,
FBUtilities.getLocalAddress());
}
remover.join();
@@ -215,7 +215,7 @@ public class RemoveTest extends CleanupH
assertEquals(Stage.MISC, msg.getMessageType());
// simulate a response from remote server
Message response = msg.getReply(FBUtilities.getLocalAddress(),
new byte[]{ });
- MessagingService.instance.sendOneWay(response,
FBUtilities.getLocalAddress());
+ MessagingService.instance().sendOneWay(response,
FBUtilities.getLocalAddress());
return null;
}
else