Author: jbellis
Date: Wed Oct 7 16:23:43 2009
New Revision: 822793
URL: http://svn.apache.org/viewvc?rev=822793&view=rev
Log:
rename getMessagingInstance -> instance; r/m unused methods from FBUtiltities
patch by jbellis; reviewed by Eric Evans for CASSANDRA-385
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DataFileVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadRepairManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/MembershipCleaner.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/MembershipCleanerVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdater.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DataFileVerbHandler.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DataFileVerbHandler.java?rev=822793&r1=822792&r2=822793&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DataFileVerbHandler.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DataFileVerbHandler.java
Wed Oct 7 16:23:43 2009
@@ -53,7 +53,7 @@
dos.writeUTF(sstable.getFilename());
}
Message response = message.getReply(
StorageService.getLocalStorageEndPoint(), bos.toByteArray());
- MessagingService.getMessagingInstance().sendOneWay(response,
message.getFrom());
+ MessagingService.instance().sendOneWay(response,
message.getFrom());
}
catch (IOException ex)
{
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=822793&r1=822792&r2=822793&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
Wed Oct 7 16:23:43 2009
@@ -21,7 +21,6 @@
import java.util.Collection;
import java.util.Timer;
import java.util.TimerTask;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.locks.Lock;
@@ -97,7 +96,7 @@
RowMutation rm = new RowMutation(tableName, purgedRow);
Message message = rm.makeRowMutationMessage();
QuorumResponseHandler<Boolean> quorumResponseHandler = new
QuorumResponseHandler<Boolean>(1, new WriteResponseResolver());
- MessagingService.getMessagingInstance().sendRR(message, new
EndPoint[]{ endPoint }, quorumResponseHandler);
+ MessagingService.instance().sendRR(message, new EndPoint[]{ endPoint
}, quorumResponseHandler);
return quorumResponseHandler.get();
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=822793&r1=822792&r2=822793&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
Wed Oct 7 16:23:43 2009
@@ -100,7 +100,7 @@
Message response =
message.getReply(StorageService.getLocalStorageEndPoint(), bytes);
if (logger_.isDebugEnabled())
logger_.debug("Read key " + readCommand.key + "; sending
response to " + message.getMessageId() + "@" + message.getFrom());
- MessagingService.getMessagingInstance().sendOneWay(response,
message.getFrom());
+ MessagingService.instance().sendOneWay(response,
message.getFrom());
/* Do read repair if header of the message says so */
if (message.getHeader(ReadCommand.DO_REPAIR) != null)
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=822793&r1=822792&r2=822793&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
Wed Oct 7 16:23:43 2009
@@ -64,7 +64,7 @@
Message responseMessage =
WriteResponse.makeWriteResponseMessage(message, response);
if (logger_.isDebugEnabled())
logger_.debug(rm + " applied. Sending response to " +
message.getMessageId() + "@" + message.getFrom());
-
MessagingService.getMessagingInstance().sendOneWay(responseMessage,
message.getFrom());
+ MessagingService.instance().sendOneWay(responseMessage,
message.getFrom());
}
catch (IOException e)
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=822793&r1=822792&r2=822793&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Wed
Oct 7 16:23:43 2009
@@ -196,7 +196,7 @@
/* Send a StreamStatusMessage object which may require the source
node to re-stream certain files. */
StreamContextManager.StreamStatusMessage streamStatusMessage = new
StreamContextManager.StreamStatusMessage(streamStatus);
Message message =
StreamContextManager.StreamStatusMessage.makeStreamStatusMessage(streamStatusMessage);
- MessagingService.getMessagingInstance().sendOneWay(message, to);
+ MessagingService.instance().sendOneWay(message, to);
/* If we're done with everything for this host, remove from
bootstrap sources */
if (StreamContextManager.isDone(to.getHost()))
StorageService.instance().removeBootstrapSource(to);
@@ -246,7 +246,7 @@
if (logger_.isDebugEnabled())
logger_.debug("Sending a bootstrap initiate done message
...");
Message doneMessage = new Message(
StorageService.getLocalStorageEndPoint(), "",
StorageService.bootStrapInitiateDoneVerbHandler_, new byte[0] );
-
MessagingService.getMessagingInstance().sendOneWay(doneMessage,
message.getFrom());
+ MessagingService.instance().sendOneWay(doneMessage,
message.getFrom());
}
catch ( IOException ex )
{
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java?rev=822793&r1=822792&r2=822793&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
Wed Oct 7 16:23:43 2009
@@ -173,7 +173,7 @@
Message message =
BootstrapInitiateMessage.makeBootstrapInitiateMessage(biMessage);
if (logger_.isDebugEnabled())
logger_.debug("Sending a bootstrap initiate message to " +
target + " ...");
- MessagingService.getMessagingInstance().sendOneWay(message,
target);
+ MessagingService.instance().sendOneWay(message, target);
if (logger_.isDebugEnabled())
logger_.debug("Waiting for transfer to " + target + " to
complete");
StreamManager.instance(target).waitForStreamCompletion();
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java?rev=822793&r1=822792&r2=822793&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java
Wed Oct 7 16:23:43 2009
@@ -218,7 +218,7 @@
Message message =
BootstrapMetadataMessage.makeBootstrapMetadataMessage(bsMetadataMessage);
if (logger_.isDebugEnabled())
logger_.debug("Sending the BootstrapMetadataMessage to " +
source);
- MessagingService.getMessagingInstance().sendOneWay(message,
source);
+ MessagingService.instance().sendOneWay(message, source);
StorageService.instance().addBootstrapSource(source);
}
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=822793&r1=822792&r2=822793&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
Wed Oct 7 16:23:43 2009
@@ -137,10 +137,10 @@
/* register with the Failure Detector for receiving Failure detector
events */
FailureDetector.instance().registerFailureDetectionEventListener(this);
/* register the verbs */
-
MessagingService.getMessagingInstance().registerVerbHandlers(JOIN_VERB_HANDLER,
new JoinVerbHandler());
-
MessagingService.getMessagingInstance().registerVerbHandlers(GOSSIP_DIGEST_SYN_VERB,
new GossipDigestSynVerbHandler());
-
MessagingService.getMessagingInstance().registerVerbHandlers(GOSSIP_DIGEST_ACK_VERB,
new GossipDigestAckVerbHandler());
-
MessagingService.getMessagingInstance().registerVerbHandlers(GOSSIP_DIGEST_ACK2_VERB,
new GossipDigestAck2VerbHandler());
+ MessagingService.instance().registerVerbHandlers(JOIN_VERB_HANDLER,
new JoinVerbHandler());
+
MessagingService.instance().registerVerbHandlers(GOSSIP_DIGEST_SYN_VERB, new
GossipDigestSynVerbHandler());
+
MessagingService.instance().registerVerbHandlers(GOSSIP_DIGEST_ACK_VERB, new
GossipDigestAckVerbHandler());
+
MessagingService.instance().registerVerbHandlers(GOSSIP_DIGEST_ACK2_VERB, new
GossipDigestAck2VerbHandler());
/* register the Gossip stage */
StageManager.registerStage( Gossiper.GOSSIP_STAGE, new
SingleThreadedStage("GMFD") );
}
@@ -387,7 +387,7 @@
EndPoint to = eps.get(++rrIndex_);
if (logger_.isTraceEnabled())
logger_.trace("Sending a GossipDigestSynMessage to " + to + "
...");
- MessagingService.getMessagingInstance().sendUdpOneWay(message, to);
+ MessagingService.instance().sendUdpOneWay(message, to);
return seeds_.contains(to);
}
@@ -407,7 +407,7 @@
EndPoint to = liveEndPoints.get(index);
if (logger_.isTraceEnabled())
logger_.trace("Sending a GossipDigestSynMessage to " + to + "
...");
- MessagingService.getMessagingInstance().sendUdpOneWay(message, to);
+ MessagingService.instance().sendUdpOneWay(message, to);
return seeds_.contains(to);
}
@@ -994,7 +994,7 @@
Message gDigestAckMessage =
Gossiper.instance().makeGossipDigestAckMessage(gDigestAck);
if (logger_.isTraceEnabled())
logger_.trace("Sending a GossipDigestAckMessage to " + from);
-
MessagingService.getMessagingInstance().sendUdpOneWay(gDigestAckMessage, from);
+ MessagingService.instance().sendUdpOneWay(gDigestAckMessage, from);
}
catch (IOException e)
{
@@ -1086,7 +1086,7 @@
Message gDigestAck2Message =
Gossiper.instance().makeGossipDigestAck2Message(gDigestAck2);
if (logger_.isTraceEnabled())
logger_.trace("Sending a GossipDigestAck2Message to " + from);
-
MessagingService.getMessagingInstance().sendUdpOneWay(gDigestAck2Message, from);
+ MessagingService.instance().sendUdpOneWay(gDigestAck2Message,
from);
}
catch ( IOException e )
{
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java?rev=822793&r1=822792&r2=822793&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
Wed Oct 7 16:23:43 2009
@@ -33,7 +33,7 @@
public void run()
{
String verb = message_.getVerb();
- IVerbHandler verbHandler =
MessagingService.getMessagingInstance().getVerbHandler(verb);
+ IVerbHandler verbHandler =
MessagingService.instance().getVerbHandler(verb);
if ( verbHandler != null )
{
verbHandler.doVerb(message_);
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=822793&r1=822792&r2=822793&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
Wed Oct 7 16:23:43 2009
@@ -50,6 +50,7 @@
public static final String responseVerbHandler_ = "RESPONSE";
/* Stage for responses. */
public static final String responseStage_ = "RESPONSE-STAGE";
+
private enum ReservedVerbs_ {
};
@@ -93,7 +94,7 @@
return version_;
}
- public static IMessagingService getMessagingInstance()
+ public static IMessagingService instance()
{
if ( bShutdown_ )
{
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java?rev=822793&r1=822792&r2=822793&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
Wed Oct 7 16:23:43 2009
@@ -93,7 +93,7 @@
Message message = readCommand.makeReadMessage();
if (logger_.isDebugEnabled())
logger_.debug("Performing read repair for " + readCommand_.key +
" to " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") +
"]");
- MessagingService.getMessagingInstance().sendRR(message,
replicas_.toArray(new EndPoint[replicas_.size()]), responseHandler);
+ MessagingService.instance().sendRR(message,
replicas_.toArray(new EndPoint[replicas_.size()]), responseHandler);
}
}
@@ -161,7 +161,7 @@
Message message =
readCommandDigestOnly.makeReadMessage();
if (logger_.isDebugEnabled())
logger_.debug("Reading consistency digest for " +
readCommand_.key + " from " + message.getMessageId() + "@[" +
StringUtils.join(replicas_, ", ") + "]");
- MessagingService.getMessagingInstance().sendRR(message,
replicas_.toArray(new EndPoint[replicas_.size()]), new DigestResponseHandler());
+ MessagingService.instance().sendRR(message, replicas_.toArray(new
EndPoint[replicas_.size()]), new DigestResponseHandler());
}
catch (IOException ex)
{
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeVerbHandler.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeVerbHandler.java?rev=822793&r1=822792&r2=822793&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeVerbHandler.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeVerbHandler.java
Wed Oct 7 16:23:43 2009
@@ -42,7 +42,7 @@
Message response = rangeReply.getReply(message);
if (logger.isDebugEnabled())
logger.debug("Sending " + rangeReply + " to " +
message.getMessageId() + "@" + message.getFrom());
- MessagingService.getMessagingInstance().sendOneWay(response,
message.getFrom());
+ MessagingService.instance().sendOneWay(response,
message.getFrom());
}
catch (Exception e)
{
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadRepairManager.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadRepairManager.java?rev=822793&r1=822792&r2=822793&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadRepairManager.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadRepairManager.java
Wed Oct 7 16:23:43 2009
@@ -18,17 +18,11 @@
package org.apache.cassandra.service;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.util.concurrent.locks.*;
-import org.apache.cassandra.db.Column;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.db.RowMutationMessage;
-import org.apache.cassandra.db.SuperColumn;
import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.Header;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.Cachetable;
@@ -72,7 +66,7 @@
{
String[] pieces = FBUtilities.strip(target, ":");
EndPoint to = new EndPoint(pieces[0],
Integer.parseInt(pieces[1]));
-
MessagingService.getMessagingInstance().sendOneWay(message, to);
+ MessagingService.instance().sendOneWay(message, to);
}
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=822793&r1=822792&r2=822793&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
Wed Oct 7 16:23:43 2009
@@ -155,7 +155,7 @@
public void doVerb(Message message)
{
Message reply =
message.getReply(StorageService.getLocalStorageEndPoint(), new byte[]
{(byte)(isMoveable_.get() ? 1 : 0)});
- MessagingService.getMessagingInstance().sendOneWay(reply,
message.getFrom());
+ MessagingService.instance().sendOneWay(reply, message.getFrom());
if ( isMoveable_.get() )
{
// MoveMessage moveMessage =
(MoveMessage)message.getMessageBody()[0];
@@ -190,7 +190,7 @@
/* register the load balancer stage */
StageManager.registerStage(StorageLoadBalancer.lbStage_, new
SingleThreadedStage(StorageLoadBalancer.lbStage_));
/* register the load balancer verb handler */
-
MessagingService.getMessagingInstance().registerVerbHandlers(StorageLoadBalancer.moveMessageVerbHandler_,
new MoveMessageVerbHandler());
+
MessagingService.instance().registerVerbHandlers(StorageLoadBalancer.moveMessageVerbHandler_,
new MoveMessageVerbHandler());
}
public void start()
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=822793&r1=822792&r2=822793&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
Wed Oct 7 16:23:43 2009
@@ -122,7 +122,7 @@
EndPoint endpoint = entry.getKey();
if (logger.isDebugEnabled())
logger.debug("insert writing key " + rm.key() + " to " +
message.getMessageId() + "@" + endpoint);
- MessagingService.getMessagingInstance().sendOneWay(message,
endpoint);
+ MessagingService.instance().sendOneWay(message, endpoint);
}
}
catch (IOException e)
@@ -161,7 +161,7 @@
logger.debug("insertBlocking writing key " + rm.key() + " to "
+ message.getMessageId() + "@[" + StringUtils.join(endpointMap.keySet(), ", ")
+ "]");
// Get all the targets and stick them in an array
- MessagingService.getMessagingInstance().sendRR(message,
primaryNodes.toArray(new EndPoint[primaryNodes.size()]), quorumResponseHandler);
+ MessagingService.instance().sendRR(message,
primaryNodes.toArray(new EndPoint[primaryNodes.size()]), quorumResponseHandler);
if (!quorumResponseHandler.get())
throw new UnavailableException();
if (primaryNodes.size() < endpointMap.size()) // Do we need to
bother with Hinted Handoff?
@@ -170,7 +170,7 @@
{
if (e.getKey() != e.getValue()) // Hinted Handoff to target
{
-
MessagingService.getMessagingInstance().sendOneWay(message, e.getKey());
+ MessagingService.instance().sendOneWay(message,
e.getKey());
}
}
}
@@ -251,7 +251,7 @@
if (logger.isDebugEnabled())
logger.debug("weakreadremote reading " + command + " from " +
message.getMessageId() + "@" + endPoint);
message.addHeader(ReadCommand.DO_REPAIR,
ReadCommand.DO_REPAIR.getBytes());
- iars.add(MessagingService.getMessagingInstance().sendRR(message,
endPoint));
+ iars.add(MessagingService.instance().sendRR(message, endPoint));
}
for (IAsyncResult iar: iars)
@@ -378,7 +378,7 @@
if (logger.isDebugEnabled())
logger.debug("strongread reading digest for " + command +
" from " + messageDigestOnly.getMessageId() + "@" + digestPoint);
}
- MessagingService.getMessagingInstance().sendRR(messages,
endPoints, quorumResponseHandler);
+ MessagingService.instance().sendRR(messages, endPoints,
quorumResponseHandler);
quorumResponseHandlers.add(quorumResponseHandler);
commandEndPoints.add(endPoints);
}
@@ -407,7 +407,7 @@
readResponseResolverRepair);
logger.info("DigestMismatchException: " + ex.getMessage());
Message messageRepair = command.makeReadMessage();
-
MessagingService.getMessagingInstance().sendRR(messageRepair,
commandEndPoints.get(commandIndex), quorumResponseHandlerRepair);
+ MessagingService.instance().sendRR(messageRepair,
commandEndPoints.get(commandIndex), quorumResponseHandlerRepair);
try
{
row = quorumResponseHandlerRepair.get();
@@ -478,7 +478,7 @@
Message message = command.getMessage();
if (logger.isDebugEnabled())
logger.debug("reading " + command + " from " +
message.getMessageId() + "@" + endPoint);
- IAsyncResult iar =
MessagingService.getMessagingInstance().sendRR(message, endPoint);
+ IAsyncResult iar = MessagingService.instance().sendRR(message,
endPoint);
// read response
byte[] responseBody;
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=822793&r1=822792&r2=822793&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Wed Oct 7 16:23:43 2009
@@ -43,14 +43,8 @@
import org.apache.cassandra.tools.MembershipCleanerVerbHandler;
import org.apache.cassandra.utils.FileUtils;
import org.apache.cassandra.utils.LogUtil;
-import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.SimpleCondition;
import org.apache.cassandra.io.SSTableReader;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-import org.apache.thrift.TException;
import org.apache.log4j.Logger;
import org.apache.log4j.Level;
@@ -263,19 +257,19 @@
endPointSnitch_ = DatabaseDescriptor.getEndPointSnitch();
/* register the verb handlers */
-
MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.tokenVerbHandler_,
new TokenUpdateVerbHandler());
-
MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.binaryVerbHandler_,
new BinaryVerbHandler());
-
MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.mutationVerbHandler_,
new RowMutationVerbHandler());
-
MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.readRepairVerbHandler_,
new ReadRepairVerbHandler());
-
MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.readVerbHandler_,
new ReadVerbHandler());
-
MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.bootStrapInitiateVerbHandler_,
new Table.BootStrapInitiateVerbHandler());
-
MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.bootStrapInitiateDoneVerbHandler_,
new StorageService.BootstrapInitiateDoneVerbHandler());
-
MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.bootStrapTerminateVerbHandler_,
new StreamManager.BootstrapTerminateVerbHandler());
-
MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.dataFileVerbHandler_,
new DataFileVerbHandler() );
-
MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.mbrshipCleanerVerbHandler_,
new MembershipCleanerVerbHandler() );
-
MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.bsMetadataVerbHandler_,
new BootstrapMetadataVerbHandler() );
-
MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.rangeVerbHandler_,
new RangeVerbHandler());
-
MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.bootstrapTokenVerbHandler_,
new IVerbHandler()
+ MessagingService.instance().registerVerbHandlers(tokenVerbHandler_,
new TokenUpdateVerbHandler());
+ MessagingService.instance().registerVerbHandlers(binaryVerbHandler_,
new BinaryVerbHandler());
+ MessagingService.instance().registerVerbHandlers(mutationVerbHandler_,
new RowMutationVerbHandler());
+
MessagingService.instance().registerVerbHandlers(readRepairVerbHandler_, new
ReadRepairVerbHandler());
+ MessagingService.instance().registerVerbHandlers(readVerbHandler_, new
ReadVerbHandler());
+
MessagingService.instance().registerVerbHandlers(bootStrapInitiateVerbHandler_,
new Table.BootStrapInitiateVerbHandler());
+
MessagingService.instance().registerVerbHandlers(bootStrapInitiateDoneVerbHandler_,
new StorageService.BootstrapInitiateDoneVerbHandler());
+
MessagingService.instance().registerVerbHandlers(bootStrapTerminateVerbHandler_,
new StreamManager.BootstrapTerminateVerbHandler());
+ MessagingService.instance().registerVerbHandlers(dataFileVerbHandler_,
new DataFileVerbHandler() );
+
MessagingService.instance().registerVerbHandlers(mbrshipCleanerVerbHandler_,
new MembershipCleanerVerbHandler() );
+
MessagingService.instance().registerVerbHandlers(bsMetadataVerbHandler_, new
BootstrapMetadataVerbHandler() );
+ MessagingService.instance().registerVerbHandlers(rangeVerbHandler_,
new RangeVerbHandler());
+
MessagingService.instance().registerVerbHandlers(bootstrapTokenVerbHandler_,
new IVerbHandler()
{
public void doVerb(Message message)
{
@@ -290,7 +284,7 @@
{
throw new AssertionError();
}
- MessagingService.getMessagingInstance().sendOneWay(response,
message.getFrom());
+ MessagingService.instance().sendOneWay(response,
message.getFrom());
}
});
@@ -323,9 +317,9 @@
tcpAddr_ = new EndPoint(DatabaseDescriptor.getStoragePort());
udpAddr_ = new EndPoint(DatabaseDescriptor.getControlPort());
/* Listen for application messages */
- MessagingService.getMessagingInstance().listen(tcpAddr_);
+ MessagingService.instance().listen(tcpAddr_);
/* Listen for control messages */
- MessagingService.getMessagingInstance().listenUDP(udpAddr_);
+ MessagingService.instance().listenUDP(udpAddr_);
SelectorManager.getSelectorManager().start();
SelectorManager.getUdpSelectorManager().start();
@@ -394,7 +388,7 @@
{
Message message = new Message(getLocalStorageEndPoint(), "",
bootstrapTokenVerbHandler_, ArrayUtils.EMPTY_BYTE_ARRAY);
BootstrapTokenCallback btc = new BootstrapTokenCallback();
- MessagingService.getMessagingInstance().sendRR(message, maxEndpoint,
btc);
+ MessagingService.instance().sendRR(message, maxEndpoint, btc);
return btc.getToken();
}
@@ -826,7 +820,7 @@
Message message =
BootstrapInitiateMessage.makeBootstrapInitiateMessage(biMessage);
if (logger_.isDebugEnabled())
logger_.debug("Sending a bootstrap initiate message to " +
target + " ...");
- MessagingService.getMessagingInstance().sendOneWay(message,
target);
+ MessagingService.instance().sendOneWay(message, target);
if (logger_.isDebugEnabled())
logger_.debug("Waiting for transfer to " + target + " to
complete");
StreamManager.instance(target).waitForStreamCompletion();
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java?rev=822793&r1=822792&r2=822793&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java
Wed Oct 7 16:23:43 2009
@@ -22,14 +22,12 @@
import java.io.IOException;
import java.util.*;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.io.StreamContextManager;
-import
org.apache.cassandra.service.StorageService.BootstrapInitiateDoneVerbHandler;
import org.apache.cassandra.utils.FileUtils;
import org.apache.cassandra.utils.LogUtil;
import org.apache.log4j.Logger;
@@ -120,7 +118,7 @@
File file = filesToStream_.get(0);
if (logger_.isDebugEnabled())
logger_.debug("Streaming file " + file + " ...");
-
MessagingService.getMessagingInstance().stream(file.getAbsolutePath(), 0L,
file.length(), StorageService.getLocalStorageEndPoint(), to_);
+ MessagingService.instance().stream(file.getAbsolutePath(), 0L,
file.length(), StorageService.getLocalStorageEndPoint(), to_);
}
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/MembershipCleaner.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/MembershipCleaner.java?rev=822793&r1=822792&r2=822793&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/MembershipCleaner.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/MembershipCleaner.java
Wed Oct 7 16:23:43 2009
@@ -26,7 +26,6 @@
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Serializable;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.net.EndPoint;
@@ -74,7 +73,7 @@
}
System.out.println("Sending a membership clean message to " + target);
-
MessagingService.getMessagingInstance().sendOneWay(mbrshipCleanerMessage,
target);
+ MessagingService.instance().sendOneWay(mbrshipCleanerMessage, target);
Thread.sleep(MembershipCleaner.waitTime_);
System.out.println("Done sending the update message");
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/MembershipCleanerVerbHandler.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/MembershipCleanerVerbHandler.java?rev=822793&r1=822792&r2=822793&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/MembershipCleanerVerbHandler.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/MembershipCleanerVerbHandler.java
Wed Oct 7 16:23:43 2009
@@ -71,7 +71,7 @@
if (logger_.isDebugEnabled())
logger_.debug("Sending a membership clean message to " +
targetNode);
- MessagingService.getMessagingInstance().sendOneWay(message,
targetNode);
+ MessagingService.instance().sendOneWay(message, targetNode);
break;
}
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdater.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdater.java?rev=822793&r1=822792&r2=822793&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdater.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdater.java
Wed Oct 7 16:23:43 2009
@@ -18,13 +18,8 @@
package org.apache.cassandra.tools;
-import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
-import java.io.FileInputStream;
-import java.io.InputStreamReader;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Token;
@@ -79,7 +74,7 @@
bos.toByteArray());
System.out.println("Sending a token update message to " + target);
- MessagingService.getMessagingInstance().sendOneWay(tokenUpdateMessage,
target);
+ MessagingService.instance().sendOneWay(tokenUpdateMessage, target);
Thread.sleep(TokenUpdater.waitTime_);
System.out.println("Done sending the update message");
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=822793&r1=822792&r2=822793&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
Wed Oct 7 16:23:43 2009
@@ -38,46 +38,8 @@
{
private static Logger logger_ = Logger.getLogger(FBUtilities.class);
- private static final char[] HEX_CHARS = "0123456789abcdef".toCharArray();
-
private static InetAddress localInetAddress_;
- public static String getTimestamp()
- {
- Date date = new Date();
- DateFormat df = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss");
- return df.format(date);
- }
-
- public static String getTimestamp(long value)
- {
- Date date = new Date(value);
- DateFormat df = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss");
- return df.format(date);
- }
-
- public static int getBits(int x, int p, int n)
- {
- return ( x >>> (p + 1 - n)) & ~(~0 << n);
- }
-
- public static String getCurrentThreadStackTrace()
- {
- Throwable throwable = new Throwable();
- StackTraceElement[] ste = throwable.getStackTrace();
- StringBuilder sbuf = new StringBuilder();
-
- for ( int i = ste.length - 1; i > 0; --i )
- {
- sbuf.append(ste[i].getClassName())
- .append(".")
- .append(ste[i].getMethodName())
- .append("/");
- }
- sbuf.deleteCharAt(sbuf.length() - 1);
- return sbuf.toString();
- }
-
public static String[] strip(String string, String token)
{
StringTokenizer st = new StringTokenizer(string, token);
@@ -89,50 +51,6 @@
return result.toArray( new String[0] );
}
- public static byte[] serializeToStream(Object o)
- {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- byte[] bytes = new byte[0];
- try
- {
- ObjectOutputStream oos = new ObjectOutputStream(bos);
- oos.writeObject(o);
- oos.flush();
- bytes = bos.toByteArray();
- oos.close();
- bos.close();
- }
- catch ( IOException e )
- {
- LogUtil.getLogger(FBUtilities.class.getName()).info(
LogUtil.throwableToString(e) );
- }
- return bytes;
- }
-
- public static Object deserializeFromStream(byte[] bytes)
- {
- Object o = null;
- ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
- try
- {
- ObjectInputStream ois = new ObjectInputStream(bis);
- try
- {
- o = ois.readObject();
- }
- catch ( ClassNotFoundException e )
- {
- }
- ois.close();
- bis.close();
- }
- catch ( IOException e )
- {
- LogUtil.getLogger(FBUtilities.class.getName()).info(
LogUtil.throwableToString(e) );
- }
- return o;
- }
-
public static InetAddress getLocalAddress() throws UnknownHostException
{
if ( localInetAddress_ == null )
@@ -150,17 +68,6 @@
return inetAddr.getHostAddress();
}
- public static boolean isHostLocalHost(InetAddress host)
- {
- try {
- return getLocalAddress().equals(host);
- }
- catch ( UnknownHostException e )
- {
- return false;
- }
- }
-
public static byte[] toByteArray(int i)
{
byte[] bytes = new byte[4];
@@ -223,14 +130,6 @@
else return 1;
}
- public static String stackTrace(Throwable e)
- {
- StringWriter sw = new StringWriter();
- PrintWriter pw = new PrintWriter( sw );
- e.printStackTrace(pw);
- return sw.toString();
- }
-
public static BigInteger hash(String data)
{
byte[] result = hash(HashingSchemes.MD5, data.getBytes());
@@ -254,11 +153,6 @@
return result;
}
- public static boolean isEqual(byte[] digestA, byte[] digestB)
- {
- return MessageDigest.isEqual(digestA, digestB);
- }
-
// The given byte array is compressed onto the specified stream.
// The method does not close the stream. The caller will have to do it.
public static void compressToStream(byte[] input, ByteArrayOutputStream
bos) throws IOException
@@ -281,21 +175,6 @@
}
- public static byte[] compress(byte[] input) throws IOException
- {
- // Create an expandable byte array to hold the compressed data.
- // You cannot use an array that's the same size as the orginal because
- // there is no guarantee that the compressed data will be smaller than
- // the uncompressed data.
- ByteArrayOutputStream bos = new ByteArrayOutputStream(input.length);
- compressToStream(input,bos);
- bos.close();
-
- // Get the compressed data
- return bos.toByteArray();
- }
-
-
public static byte[] decompress(byte[] compressedData, int off, int len)
throws IOException, DataFormatException
{
// Create the decompressor and give it the data to compress
@@ -318,58 +197,6 @@
return bos.toByteArray();
}
- public static byte[] decompress(byte[] compressedData) throws IOException,
DataFormatException
- {
- return decompress(compressedData, 0, compressedData.length);
- }
-
- public static byte[] xor(byte[] b1, byte[] b2)
- {
- assert b1 != null;
- assert b2 != null;
- byte[] bLess;
- byte[] bMore;
-
- if(b1.length > b2.length)
- {
- bLess = b2;
- bMore = b1;
- }
- else
- {
- bLess = b1;
- bMore = b2;
- }
-
- for(int i = 0 ; i< bLess.length; i++ )
- {
- bMore[i] = (byte)(bMore[i] ^ bLess[i]);
- }
-
- return bMore;
- }
-
- public static int getUTF8Length(String string)
- {
- /*
- * We store the string as UTF-8 encoded, so when we calculate the
length, it
- * should be converted to UTF-8.
- */
- String utfName = string;
- int length = utfName.length();
- try
- {
- //utfName = new String(string.getBytes("UTF-8"));
- length = string.getBytes("UTF-8").length;
- }
- catch (UnsupportedEncodingException e)
- {
-
LogUtil.getLogger(FBUtilities.class.getName()).info(LogUtil.throwableToString(e));
- }
-
- return length;
- }
-
public static void writeByteArray(byte[] bytes, DataOutput out) throws
IOException
{
out.writeInt(bytes.length);