Author: jbellis
Date: Sat Aug 13 06:00:13 2011
New Revision: 1157331

URL: http://svn.apache.org/viewvc?rev=1157331&view=rev
Log:
remove unused parts of StorageLoadBalancer and rename to LoadBroadcaster
patch by jbellis

Added:
    cassandra/trunk/src/java/org/apache/cassandra/service/LoadBroadcaster.java
Removed:
    
cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
Modified:
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java

Added: 
cassandra/trunk/src/java/org/apache/cassandra/service/LoadBroadcaster.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/LoadBroadcaster.java?rev=1157331&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/LoadBroadcaster.java 
(added)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/LoadBroadcaster.java 
Sat Aug 13 06:00:13 2011
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.net.InetAddress;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.gms.*;
+
+public class LoadBroadcaster implements IEndpointStateChangeSubscriber
+{
+    private static final int BROADCAST_INTERVAL = 60 * 1000;
+
+    public static final LoadBroadcaster instance = new LoadBroadcaster();
+
+    private static final Logger logger_ = 
LoggerFactory.getLogger(LoadBroadcaster.class);
+
+    private Map<InetAddress, Double> loadInfo_ = new HashMap<InetAddress, 
Double>();
+
+    private LoadBroadcaster()
+    {
+        Gossiper.instance.register(this);
+    }
+
+    public void onChange(InetAddress endpoint, ApplicationState state, 
VersionedValue value)
+    {
+        if (state != ApplicationState.LOAD)
+            return;
+        loadInfo_.put(endpoint, Double.valueOf(value.value));
+    }
+
+    public void onJoin(InetAddress endpoint, EndpointState epState)
+    {
+        VersionedValue localValue = 
epState.getApplicationState(ApplicationState.LOAD);
+        if (localValue != null)
+        {
+            onChange(endpoint, ApplicationState.LOAD, localValue);
+        }
+    }
+
+    public void onAlive(InetAddress endpoint, EndpointState state) {}
+
+    public void onDead(InetAddress endpoint, EndpointState state) {}
+
+    public void onRemove(InetAddress endpoint) {}
+
+    public Map<InetAddress, Double> getLoadInfo()
+    {
+        return loadInfo_;
+    }
+
+    public void startBroadcasting()
+    {
+        // send the first broadcast "right away" (i.e., in 2 gossip 
heartbeats, when we should have someone to talk to);
+        // after that send every BROADCAST_INTERVAL.
+        Runnable runnable = new Runnable()
+        {
+            public void run()
+            {
+                if (logger_.isDebugEnabled())
+                    logger_.debug("Disseminating load info ...");
+                
Gossiper.instance.addLocalApplicationState(ApplicationState.LOAD,
+                                                           
StorageService.instance.valueFactory.load(StorageService.instance.getLoad()));
+            }
+        };
+        StorageService.scheduledTasks.scheduleWithFixedDelay(runnable, 2 * 
Gossiper.intervalInMillis, BROADCAST_INTERVAL, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Wait for at least BROADCAST_INTERVAL ms, to give all nodes enough time 
to
+     * report in.
+     */
+    public void waitForLoadInfo()
+    {
+        int duration = BROADCAST_INTERVAL + StorageService.RING_DELAY;
+        try
+        {
+            logger_.info("Sleeping {} ms to wait for load information...", 
duration);
+            Thread.sleep(duration);
+        }
+        catch (InterruptedException e)
+        {
+            throw new AssertionError(e);
+        }
+    }
+}
+

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1157331&r1=1157330&r2=1157331&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java 
Sat Aug 13 06:00:13 2011
@@ -454,7 +454,7 @@ public class StorageService implements I
         
Gossiper.instance.addLocalApplicationState(ApplicationState.RPC_ADDRESS, 
valueFactory.rpcaddress(DatabaseDescriptor.getRpcAddress()));
 
         MessagingService.instance().listen(FBUtilities.getLocalAddress());
-        StorageLoadBalancer.instance.startBroadcasting();
+        LoadBroadcaster.instance.startBroadcasting();
         MigrationManager.passiveAnnounce(DatabaseDescriptor.getDefsVersion());
         
Gossiper.instance.addLocalApplicationState(ApplicationState.RELEASE_VERSION, 
valueFactory.releaseVersion());
 
@@ -470,7 +470,7 @@ public class StorageService implements I
             && 
!(DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddress()) || 
SystemTable.isBootstrapped()))
         {
             setMode("Joining: getting load and schema information", true);
-            StorageLoadBalancer.instance.waitForLoadInfo();
+            LoadBroadcaster.instance.waitForLoadInfo();
             if (logger_.isDebugEnabled())
                 logger_.debug("... got load + schema info");
             if (tokenMetadata_.isMember(FBUtilities.getBroadcastAddress()))
@@ -479,7 +479,7 @@ public class StorageService implements I
                 throw new UnsupportedOperationException(s);
             }
             setMode("Joining: getting bootstrap token", true);
-            token = BootStrapper.getBootstrapToken(tokenMetadata_, 
StorageLoadBalancer.instance.getLoadInfo());
+            token = BootStrapper.getBootstrapToken(tokenMetadata_, 
LoadBroadcaster.instance.getLoadInfo());
             // don't bootstrap if there are no tables defined.
             if (DatabaseDescriptor.getNonSystemTables().size() > 0)
             {
@@ -1273,7 +1273,7 @@ public class StorageService implements I
     public Map<String, String> getLoadMap()
     {
         Map<String, String> map = new HashMap<String, String>();
-        for (Map.Entry<InetAddress,Double> entry : 
StorageLoadBalancer.instance.getLoadInfo().entrySet())
+        for (Map.Entry<InetAddress,Double> entry : 
LoadBroadcaster.instance.getLoadInfo().entrySet())
         {
             map.put(entry.getKey().getHostAddress(), 
FileUtils.stringifyFileSize(entry.getValue()));
         }


Reply via email to