Author: jbellis
Date: Sat Aug 13 06:00:13 2011
New Revision: 1157331
URL: http://svn.apache.org/viewvc?rev=1157331&view=rev
Log:
remove unused parts of StorageLoadBalancer and rename to LoadBroadcaster
patch by jbellis
Added:
cassandra/trunk/src/java/org/apache/cassandra/service/LoadBroadcaster.java
Removed:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Added:
cassandra/trunk/src/java/org/apache/cassandra/service/LoadBroadcaster.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/LoadBroadcaster.java?rev=1157331&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/LoadBroadcaster.java
(added)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/LoadBroadcaster.java
Sat Aug 13 06:00:13 2011
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.net.InetAddress;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.gms.*;
+
+public class LoadBroadcaster implements IEndpointStateChangeSubscriber
+{
+ private static final int BROADCAST_INTERVAL = 60 * 1000;
+
+ public static final LoadBroadcaster instance = new LoadBroadcaster();
+
+ private static final Logger logger_ =
LoggerFactory.getLogger(LoadBroadcaster.class);
+
+ private Map<InetAddress, Double> loadInfo_ = new HashMap<InetAddress,
Double>();
+
+ private LoadBroadcaster()
+ {
+ Gossiper.instance.register(this);
+ }
+
+ public void onChange(InetAddress endpoint, ApplicationState state,
VersionedValue value)
+ {
+ if (state != ApplicationState.LOAD)
+ return;
+ loadInfo_.put(endpoint, Double.valueOf(value.value));
+ }
+
+ public void onJoin(InetAddress endpoint, EndpointState epState)
+ {
+ VersionedValue localValue =
epState.getApplicationState(ApplicationState.LOAD);
+ if (localValue != null)
+ {
+ onChange(endpoint, ApplicationState.LOAD, localValue);
+ }
+ }
+
+ public void onAlive(InetAddress endpoint, EndpointState state) {}
+
+ public void onDead(InetAddress endpoint, EndpointState state) {}
+
+ public void onRemove(InetAddress endpoint) {}
+
+ public Map<InetAddress, Double> getLoadInfo()
+ {
+ return loadInfo_;
+ }
+
+ public void startBroadcasting()
+ {
+ // send the first broadcast "right away" (i.e., in 2 gossip
heartbeats, when we should have someone to talk to);
+ // after that send every BROADCAST_INTERVAL.
+ Runnable runnable = new Runnable()
+ {
+ public void run()
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug("Disseminating load info ...");
+
Gossiper.instance.addLocalApplicationState(ApplicationState.LOAD,
+
StorageService.instance.valueFactory.load(StorageService.instance.getLoad()));
+ }
+ };
+ StorageService.scheduledTasks.scheduleWithFixedDelay(runnable, 2 *
Gossiper.intervalInMillis, BROADCAST_INTERVAL, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Wait for at least BROADCAST_INTERVAL ms, to give all nodes enough time
to
+ * report in.
+ */
+ public void waitForLoadInfo()
+ {
+ int duration = BROADCAST_INTERVAL + StorageService.RING_DELAY;
+ try
+ {
+ logger_.info("Sleeping {} ms to wait for load information...",
duration);
+ Thread.sleep(duration);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ }
+}
+
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1157331&r1=1157330&r2=1157331&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Sat Aug 13 06:00:13 2011
@@ -454,7 +454,7 @@ public class StorageService implements I
Gossiper.instance.addLocalApplicationState(ApplicationState.RPC_ADDRESS,
valueFactory.rpcaddress(DatabaseDescriptor.getRpcAddress()));
MessagingService.instance().listen(FBUtilities.getLocalAddress());
- StorageLoadBalancer.instance.startBroadcasting();
+ LoadBroadcaster.instance.startBroadcasting();
MigrationManager.passiveAnnounce(DatabaseDescriptor.getDefsVersion());
Gossiper.instance.addLocalApplicationState(ApplicationState.RELEASE_VERSION,
valueFactory.releaseVersion());
@@ -470,7 +470,7 @@ public class StorageService implements I
&&
!(DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddress()) ||
SystemTable.isBootstrapped()))
{
setMode("Joining: getting load and schema information", true);
- StorageLoadBalancer.instance.waitForLoadInfo();
+ LoadBroadcaster.instance.waitForLoadInfo();
if (logger_.isDebugEnabled())
logger_.debug("... got load + schema info");
if (tokenMetadata_.isMember(FBUtilities.getBroadcastAddress()))
@@ -479,7 +479,7 @@ public class StorageService implements I
throw new UnsupportedOperationException(s);
}
setMode("Joining: getting bootstrap token", true);
- token = BootStrapper.getBootstrapToken(tokenMetadata_,
StorageLoadBalancer.instance.getLoadInfo());
+ token = BootStrapper.getBootstrapToken(tokenMetadata_,
LoadBroadcaster.instance.getLoadInfo());
// don't bootstrap if there are no tables defined.
if (DatabaseDescriptor.getNonSystemTables().size() > 0)
{
@@ -1273,7 +1273,7 @@ public class StorageService implements I
public Map<String, String> getLoadMap()
{
Map<String, String> map = new HashMap<String, String>();
- for (Map.Entry<InetAddress,Double> entry :
StorageLoadBalancer.instance.getLoadInfo().entrySet())
+ for (Map.Entry<InetAddress,Double> entry :
LoadBroadcaster.instance.getLoadInfo().entrySet())
{
map.put(entry.getKey().getHostAddress(),
FileUtils.stringifyFileSize(entry.getValue()));
}