Author: jbellis
Date: Fri Oct 9 17:07:24 2009
New Revision: 823617
URL: http://svn.apache.org/viewvc?rev=823617&view=rev
Log:
fix regressions
patch by jbellis reviewed by Eric Evans for CASSANDRA-477
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=823617&r1=823616&r2=823617&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
Fri Oct 9 17:07:24 2009
@@ -37,6 +37,7 @@
import org.apache.commons.lang.ArrayUtils;
import org.apache.cassandra.locator.TokenMetadata;
+ import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.net.*;
import org.apache.cassandra.net.io.StreamContextManager;
import org.apache.cassandra.net.io.IStreamComplete;
@@ -208,6 +209,7 @@
if (!maxEndpoint.equals(StorageService.getLocalStorageEndPoint()))
{
+
StorageService.instance().retrofitPorts(Arrays.asList(maxEndpoint));
Token<?> t = getBootstrapTokenFrom(maxEndpoint);
logger_.info("Setting token to " + t + " to assume load from "
+ maxEndpoint.getHost());
ss.updateToken(t);
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=823617&r1=823616&r2=823617&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
Fri Oct 9 17:07:24 2009
@@ -95,7 +95,7 @@
final static String GOSSIP_DIGEST_ACK_VERB = "GAV";
/* GA2V - abbreviation for GOSSIP-DIGEST-ACK2-VERB */
final static String GOSSIP_DIGEST_ACK2_VERB = "GA2V";
- final static int intervalInMillis_ = 1000;
+ public final static int intervalInMillis_ = 1000;
private static Logger logger_ = Logger.getLogger(Gossiper.class);
static Gossiper gossiper_;
@@ -522,6 +522,8 @@
{
reqdEndPointState = new
EndPointState(epState.getHeartBeatState());
}
+ if (logger_.isTraceEnabled())
+ logger_.trace("Adding state " + key + ": " +
appState.getState());
reqdEndPointState.addApplicationState(key, appState);
}
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=823617&r1=823616&r2=823617&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
Fri Oct 9 17:07:24 2009
@@ -73,7 +73,7 @@
* This method changes the ports of the endpoints from
* the control port to the storage ports.
*/
- protected void retrofitPorts(List<EndPoint> eps)
+ public void retrofitPorts(List<EndPoint> eps)
{
for ( EndPoint ep : eps )
{
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=823617&r1=823616&r2=823617&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
Fri Oct 9 17:07:24 2009
@@ -351,8 +351,9 @@
public void startBroadcasting()
{
- /* starts a load timer thread */
- loadTimer_.schedule(new LoadDisseminator(), BROADCAST_INTERVAL,
BROADCAST_INTERVAL);
+ // send the first broadcast "right away" (i.e., in 2 gossip
heartbeats, when we should have someone to talk to);
+ // after that send every BROADCAST_INTERVAL.
+ loadTimer_.schedule(new LoadDisseminator(), 2 *
Gossiper.intervalInMillis_, BROADCAST_INTERVAL);
}
/** wait for node information to be available. if the rest of the cluster
just came up,
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=823617&r1=823616&r2=823617&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Fri Oct 9 17:07:24 2009
@@ -226,12 +226,12 @@
MessagingService.instance().registerVerbHandlers(mbrshipCleanerVerbHandler_,
new MembershipCleanerVerbHandler() );
MessagingService.instance().registerVerbHandlers(rangeVerbHandler_,
new RangeVerbHandler());
// see BootStrapper for a summary of how the bootstrap verbs interact
+
MessagingService.instance().registerVerbHandlers(bootstrapTokenVerbHandler_,
new BootStrapper.BootstrapTokenVerbHandler());
MessagingService.instance().registerVerbHandlers(bootstrapMetadataVerbHandler_,
new BootstrapMetadataVerbHandler() );
MessagingService.instance().registerVerbHandlers(bootStrapInitiateVerbHandler_,
new BootStrapper.BootStrapInitiateVerbHandler());
MessagingService.instance().registerVerbHandlers(bootStrapInitiateDoneVerbHandler_,
new BootStrapper.BootstrapInitiateDoneVerbHandler());
MessagingService.instance().registerVerbHandlers(bootStrapTerminateVerbHandler_,
new BootStrapper.BootstrapTerminateVerbHandler());
-
MessagingService.instance().registerVerbHandlers(bootstrapTokenVerbHandler_,
new BootStrapper.BootstrapTokenVerbHandler());
-
+
StageManager.registerStage(StorageService.mutationStage_,
new
MultiThreadedStage(StorageService.mutationStage_,
DatabaseDescriptor.getConcurrentWriters()));
StageManager.registerStage(StorageService.readStage_,
@@ -266,15 +266,24 @@
StorageLoadBalancer.instance().startBroadcasting();
+ // have to start the gossip service before we can see any info on
other nodes. this is necessary
+ // for bootstrap to get the load info it needs.
+ // (we won't be part of the storage ring though until we add a nodeId
to our state, below.)
+ Gossiper.instance().register(this);
+ Gossiper.instance().start(udpAddr_, storageMetadata_.getGeneration());
+
if (isBootstrapMode)
{
- BootStrapper.startBootstrap();
+ BootStrapper.startBootstrap(); // handles token update
+ }
+ else
+ {
+ tokenMetadata_.update(storageMetadata_.getToken(),
StorageService.tcpAddr_, isBootstrapMode);
}
- Gossiper.instance().register(this);
- Gossiper.instance().start(udpAddr_, storageMetadata_.getGeneration());
- /* Make sure this token gets gossiped around. */
- tokenMetadata_.update(storageMetadata_.getToken(),
StorageService.tcpAddr_, isBootstrapMode);
+ // Gossip my token.
+ // note that before we do this we've (a) finalized what the token is
actually going to be, and
+ // (b) added a bootstrap state (done by startBootstrap)
ApplicationState state = new
ApplicationState(StorageService.getPartitioner().getTokenFactory().toString(storageMetadata_.getToken()));
Gossiper.instance().addApplicationState(StorageService.nodeId_, state);
}
@@ -935,6 +944,11 @@
return
nodePicker_.getHintedStorageEndPoints(partitioner_.getToken(key));
}
+ public void retrofitPorts(List<EndPoint> eps)
+ {
+ nodePicker_.retrofitPorts(eps);
+ }
+
/**
* This function finds the most suitable endpoint given a key.
* It checks for locality and alive test.