Author: gdusbabek
Date: Mon Jan 31 16:30:16 2011
New Revision: 1065676

URL: http://svn.apache.org/viewvc?rev=1065676&view=rev
Log:
merge from 0.7

Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/conf/cassandra.yaml
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
   (props changed)
    cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
    cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2Snitch.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
    cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
    cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    
cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
    cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java
    
cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jan 31 16:30:16 2011
@@ -1,5 +1,5 @@
 
/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1064713
-/cassandra/branches/cassandra-0.7:1026516-1064915
+/cassandra/branches/cassandra-0.7:1026516-1065665
 /cassandra/branches/cassandra-0.7.0:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3:774578-796573

Modified: cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1065676&r1=1065675&r2=1065676&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Mon Jan 31 16:30:16 2011
@@ -58,7 +58,9 @@
    (CASSANDRA-2058)
  * fix math in RandomPartitioner.describeOwnership (CASSANDRA-2071)
  * fix deletion of sstable non-data components (CASSANDRA-2059)
-
+ * avoid blocking gossip while deleting handoff hints (CASSANDRA-2073)
+ * ignore messages from newer versions, keep track of nodes in gossip 
+   regardless of version (CASSANDRA-1970)
 
 0.7.0-final
  * fix offsets to ByteBuffer.get (CASSANDRA-1939)

Modified: cassandra/trunk/conf/cassandra.yaml
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra.yaml?rev=1065676&r1=1065675&r2=1065676&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra.yaml (original)
+++ cassandra/trunk/conf/cassandra.yaml Mon Jan 31 16:30:16 2011
@@ -225,7 +225,7 @@ rpc_timeout_in_ms: 10000
 # org.apache.cassandra.locator.PropertyFileSnitch:
 #  - Proximity is determined by rack and data center, which are
 #    explicitly configured in cassandra-topology.properties.
-endpoint_snitch: org.apache.cassandra.locator.SimpleSnitch
+endpoint_snitch: org.apache.cassandra.locator.PropertyFileSnitch
 
 # dynamic_snitch -- This boolean controls whether the above snitch is
 # wrapped with a dynamic snitch, which will monitor read latencies

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jan 31 16:30:16 2011
@@ -1,5 +1,5 @@
 
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1064713
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1064915
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1065665
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
 
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jan 31 16:30:16 2011
@@ -1,5 +1,5 @@
 
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1064713
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1064915
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1065665
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
 
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jan 31 16:30:16 2011
@@ -1,5 +1,5 @@
 
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1064713
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1064915
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1065665
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
 
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jan 31 16:30:16 2011
@@ -1,5 +1,5 @@
 
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1064713
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1064915
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1065665
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
 
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jan 31 16:30:16 2011
@@ -1,5 +1,5 @@
 
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1064713
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1064915
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1065665
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
 
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1065676&r1=1065675&r2=1065676&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
Mon Jan 31 16:30:16 2011
@@ -120,8 +120,17 @@ public class DatabaseDescriptor
         {
             URL url = getStorageConfigURL();
             logger.info("Loading settings from " + url);
-            
-            InputStream input = url.openStream();
+
+            InputStream input = null;
+            try
+            {
+                input = url.openStream();
+            }
+            catch (IOException e)
+            {
+                // getStorageConfigURL should have ruled this out
+                throw new AssertionError(e);
+            }
             org.yaml.snakeyaml.constructor.Constructor constructor = new 
org.yaml.snakeyaml.constructor.Constructor(Config.class);
             TypeDescription desc = new TypeDescription(Config.class);
             desc.putListPropertyType("keyspaces", RawKeyspace.class);
@@ -260,7 +269,16 @@ public class DatabaseDescriptor
             
             /* Local IP or hostname to bind RPC server to */
             if (conf.rpc_address != null)
-                rpcAddress = InetAddress.getByName(conf.rpc_address);
+            {
+                try
+                {
+                    rpcAddress = InetAddress.getByName(conf.rpc_address);
+                }
+                catch (UnknownHostException e)
+                {
+                    throw new ConfigurationException("Unknown host in 
rpc_address " + conf.rpc_address);
+                }
+            }
 
             if (conf.thrift_framed_transport_size_in_mb > 0 && 
conf.thrift_max_message_length_in_mb < conf.thrift_framed_transport_size_in_mb)
             {
@@ -298,6 +316,10 @@ public class DatabaseDescriptor
                 {
                     throw new ConfigurationException("Invalid Request 
Scheduler class " + conf.request_scheduler);
                 }
+                catch (Exception e)
+                {
+                    throw new ConfigurationException("Unable to instantiate 
request scheduler", e);
+                }
             }
             else
             {
@@ -374,33 +396,33 @@ public class DatabaseDescriptor
             {
                 throw new ConfigurationException("seeds configuration is 
missing; a minimum of one seed is required.");
             }
-            Class seedProviderClass = 
Class.forName(conf.seed_provider.class_name);
-            seedProvider = 
(SeedProvider)seedProviderClass.getConstructor(Map.class).newInstance(conf.seed_provider.parameters);
+            try 
+            {
+                Class seedProviderClass = 
Class.forName(conf.seed_provider.class_name);
+                seedProvider = 
(SeedProvider)seedProviderClass.getConstructor(Map.class).newInstance(conf.seed_provider.parameters);
+            }
+            // there are about 5 checked exceptions that could be thrown here.
+            catch (Exception e)
+            {
+                logger.error("Fatal configuration error", e);
+                System.err.println(e.getMessage() + "\nFatal configuration 
error; unable to start server.  See log for stacktrace.");
+                System.exit(1);
+            }
             if (seedProvider.getSeeds().size() == 0)
                 throw new ConfigurationException("The seed provider lists no 
seeds.");
         }
-        catch (UnknownHostException e)
-        {
-            logger.error("Fatal error: " + e.getMessage());
-            System.err.println("Unable to start with unknown hosts configured. 
 Use IP addresses instead of hostnames.");
-            System.exit(2);
-        }
         catch (ConfigurationException e)
         {
-            logger.error("Fatal error: " + e.getMessage(), e);
-            System.err.println("Bad configuration; unable to start server");
+            logger.error("Fatal configuration error", e);
+            System.err.println(e.getMessage() + "\nFatal configuration error; 
unable to start server.  See log for stacktrace.");
             System.exit(1);
         }
         catch (YAMLException e)
         {
-            logger.error("Fatal error: " + e.getMessage(), e);
-            System.err.println("Bad configuration; unable to start server");
+            logger.error("Fatal configuration error error", e);
+            System.err.println(e.getMessage() + "\nInvalid yaml; unable to 
start server.  See log for stacktrace.");
             System.exit(1);
         }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
     }
 
     private static IEndpointSnitch createEndpointSnitch(String 
endpointSnitchClassName) throws ConfigurationException

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1065676&r1=1065675&r2=1065676&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java 
Mon Jan 31 16:30:16 2011
@@ -144,21 +144,31 @@ public class HintedHandOffManager
         rm.apply();
     }                                                         
 
-    public static void deleteHintsForEndPoint(InetAddress endpoint)
+    public static void deleteHintsForEndPoint(final InetAddress endpoint)
     {
-        ColumnFamilyStore hintStore = 
Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(HINTS_CF);
-        RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, 
ByteBuffer.wrap(endpoint.getAddress()));
+        final ColumnFamilyStore hintStore = 
Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(HINTS_CF);
+        final RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, 
ByteBuffer.wrap(endpoint.getAddress()));
         rm.delete(new QueryPath(HINTS_CF), System.currentTimeMillis());
-        try {
-            logger_.info("Deleting any stored hints for " + endpoint);
-            rm.apply();
-            hintStore.forceFlush();
-            CompactionManager.instance.submitMajor(hintStore, 0, 
Integer.MAX_VALUE).get();
-        }
-        catch (Exception e)
+
+        // execute asynchronously to avoid blocking caller (which may be 
processing gossip)
+        Runnable runnable = new Runnable()
         {
-            logger_.warn("Could not delete hints for " + endpoint + ": " + e);
-        }
+            public void run()
+            {
+                try
+                {
+                    logger_.info("Deleting any stored hints for " + endpoint);
+                    rm.apply();
+                    hintStore.forceFlush();
+                    CompactionManager.instance.submitMajor(hintStore, 0, 
Integer.MAX_VALUE);
+                }
+                catch (Exception e)
+                {
+                    logger_.warn("Could not delete hints for " + endpoint + ": 
" + e);
+                }
+            }
+        };
+        StorageService.scheduledTasks.execute(runnable);
     }
 
     private static boolean pagingFinished(ColumnFamily hintColumnFamily, 
ByteBuffer startColumn)

Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1065676&r1=1065675&r2=1065676&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Mon Jan 31 
16:30:16 2011
@@ -26,6 +26,7 @@ import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.*;
 
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -141,6 +142,10 @@ public class Gossiper implements IFailur
      * after removal to prevent nodes from falsely reincarnating during the 
time when removal
      * gossip gets propagated to all nodes */
     Map<InetAddress, Long> justRemovedEndpoints_ = new 
ConcurrentHashMap<InetAddress, Long>();
+    
+    // protocol versions of the other nodes in the cluster
+    private final ConcurrentMap<InetAddress, Integer> versions = new 
NonBlockingHashMap<InetAddress, Integer>();
+    
 
     private Gossiper()
     {
@@ -169,6 +174,20 @@ public class Gossiper implements IFailur
     {
         subscribers_.remove(subscriber);
     }
+    
+    public void setVersion(InetAddress address, int version)
+    {
+        Integer old = versions.put(address, version);
+        EndpointState state = endpointStateMap_.get(address);
+        if (state == null)
+            addSavedEndpoint(address);
+    }
+    
+    public Integer getVersion(InetAddress address)
+    {
+        return versions.get(address);
+    }
+    
 
     public Set<InetAddress> getLiveMembers()
     {

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2Snitch.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2Snitch.java?rev=1065676&r1=1065675&r2=1065676&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2Snitch.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2Snitch.java Mon 
Jan 31 16:30:16 2011
@@ -89,7 +89,7 @@ public class Ec2Snitch extends AbstractN
     {
         // Share EC2 info via gossip.  We have to wait until Gossiper is 
initialized though.
         logger.info("Ec2Snitch adding ApplicationState ec2region=" + ec2region 
+ " ec2zone=" + ec2zone);
-        Gossiper.instance.addLocalApplicationState(ApplicationState.DC, 
StorageService.valueFactory.datacenter(ec2region));
-        Gossiper.instance.addLocalApplicationState(ApplicationState.RACK, 
StorageService.valueFactory.rack(ec2zone));
+        Gossiper.instance.addLocalApplicationState(ApplicationState.DC, 
StorageService.instance.valueFactory.datacenter(ec2region));
+        Gossiper.instance.addLocalApplicationState(ApplicationState.RACK, 
StorageService.instance.valueFactory.rack(ec2zone));
     }
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java?rev=1065676&r1=1065675&r2=1065676&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java 
Mon Jan 31 16:30:16 2011
@@ -50,22 +50,22 @@ public class TokenMetadata
     // for any nodes that boot simultaneously between same two nodes. For this 
we cannot simply make pending ranges a <tt>Multimap</tt>,
     // since that would make us unable to notice the real problem of two nodes 
trying to boot using the same token.
     // In order to do this properly, we need to know what tokens are booting 
at any time.
-    private BiMap<Token, InetAddress> bootstrapTokens;
+    private BiMap<Token, InetAddress> bootstrapTokens = HashBiMap.create();
 
     // we will need to know at all times what nodes are leaving and calculate 
ranges accordingly.
     // An anonymous pending ranges list is not enough, as that does not tell 
which node is leaving
     // and/or if the ranges are there because of bootstrap or leave operation.
     // (See CASSANDRA-603 for more detail + examples).
-    private Set<InetAddress> leavingEndpoints;
+    private Set<InetAddress> leavingEndpoints = new HashSet<InetAddress>();
 
-    private ConcurrentMap<String, Multimap<Range, InetAddress>> pendingRanges;
+    private ConcurrentMap<String, Multimap<Range, InetAddress>> pendingRanges 
= new ConcurrentHashMap<String, Multimap<Range, InetAddress>>();
 
     /* Use this lock for manipulating the token map */
     private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
     private ArrayList<Token> sortedTokens;
 
     /* list of subscribers that are notified when the tokenToEndpointMap 
changed */
-    private final CopyOnWriteArrayList<AbstractReplicationStrategy> 
subscribers;
+    private final CopyOnWriteArrayList<AbstractReplicationStrategy> 
subscribers = new CopyOnWriteArrayList<AbstractReplicationStrategy>();
 
     public TokenMetadata()
     {
@@ -77,11 +77,7 @@ public class TokenMetadata
         if (tokenToEndpointMap == null)
             tokenToEndpointMap = HashBiMap.create();
         this.tokenToEndpointMap = tokenToEndpointMap;
-        bootstrapTokens = HashBiMap.create();
-        leavingEndpoints = new HashSet<InetAddress>();
-        pendingRanges = new ConcurrentHashMap<String, Multimap<Range, 
InetAddress>>();
         sortedTokens = sortTokens();
-        subscribers = new CopyOnWriteArrayList<AbstractReplicationStrategy>();
     }
 
     private ArrayList<Token> sortTokens()

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=1065676&r1=1065675&r2=1065676&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java 
Mon Jan 31 16:30:16 2011
@@ -24,6 +24,7 @@ package org.apache.cassandra.net;
 import java.io.*;
 import java.net.Socket;
 
+import org.apache.cassandra.gms.Gossiper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,6 +56,7 @@ public class IncomingTcpConnection exten
     {
         DataInputStream input;
         boolean isStream;
+        int version;
         try
         {
             // determine the connection type to decide whether to buffer
@@ -65,6 +67,8 @@ public class IncomingTcpConnection exten
             if (!isStream)
                 // we should buffer
                 input = new DataInputStream(new 
BufferedInputStream(socket.getInputStream(), 4096));
+            version = MessagingService.getBits(header, 15, 8);
+            Gossiper.instance.setVersion(socket.getInetAddress(), version);
         }
         catch (IOException e)
         {
@@ -77,6 +81,12 @@ public class IncomingTcpConnection exten
             {
                 if (isStream)
                 {
+                    if (version > MessagingService.version_)
+                    {
+                        logger.error("Received untranslated stream from newer 
protcol version. Terminating connection!");
+                        close();
+                        return;
+                    }
                     int size = input.readInt();
                     byte[] headerBytes = new byte[size];
                     input.readFully(headerBytes);
@@ -89,12 +99,18 @@ public class IncomingTcpConnection exten
                     byte[] contentBytes = new byte[size];
                     input.readFully(contentBytes);
                     
-                    Message message = Message.serializer().deserialize(new 
DataInputStream(new ByteArrayInputStream(contentBytes)));
-                    MessagingService.instance().receive(message);
+                    if (version > MessagingService.version_)
+                        logger.info("Received connection from newer protocol 
version. Ignorning message.");
+                    else
+                    {
+                        Message message = Message.serializer().deserialize(new 
DataInputStream(new ByteArrayInputStream(contentBytes)));
+                        MessagingService.instance().receive(message);
+                    }
                 }
                 // prepare to read the next message
                 MessagingService.validateMagic(input.readInt());
                 int header = input.readInt();
+                version = MessagingService.getBits(header, 15, 8);
                 assert isStream == (MessagingService.getBits(header, 3, 1) == 
1) : "Connections cannot change type: " + isStream;
             }
             catch (EOFException e)

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1065676&r1=1065675&r2=1065676&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Mon 
Jan 31 16:30:16 2011
@@ -60,7 +60,7 @@ import org.cliffc.high_scale_lib.NonBloc
 
 public final class MessagingService implements MessagingServiceMBean
 {
-    private static final int version_ = 1;
+    public static final int version_ = 1;
     //TODO: make this parameter dynamic somehow.  Not sure if config is 
appropriate.
     private SerializerType serializerType_ = SerializerType.BINARY;
 

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java?rev=1065676&r1=1065675&r2=1065676&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
 Mon Jan 31 16:30:16 2011
@@ -189,8 +189,8 @@ public abstract class AbstractCassandraD
         }
         catch (ConfigurationException e)
         {
-            logger.error("Fatal error: " + e.getMessage());
-            System.err.println("Bad configuration; unable to start server");
+            logger.error("Fatal configuration error", e);
+            System.err.println(e.getMessage() + "\nFatal configuration error; 
unable to start server.  See log for stacktrace.");
             System.exit(1);
         }
 
@@ -213,8 +213,7 @@ public abstract class AbstractCassandraD
     
     /**
      * Start the Cassandra Daemon, assuming that it has already been
-     * initialized, via either {@link #init(String[])} or
-     * {@link #load(String[])}.
+     * initialized via {@link #init(String[])}
      *
      * Hook for JSVC
      *

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java?rev=1065676&r1=1065675&r2=1065676&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java 
Mon Jan 31 16:30:16 2011
@@ -97,7 +97,7 @@ public class MigrationManager implements
             MessagingService.instance().sendOneWay(msg, host);
         // this is for notifying nodes as they arrive in the cluster.
         if (!StorageService.instance.isClientMode())
-            
Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA, 
StorageService.valueFactory.migration(version));
+            
Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA, 
StorageService.instance.valueFactory.migration(version));
     }
 
     /**

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=1065676&r1=1065675&r2=1065676&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java 
Mon Jan 31 16:30:16 2011
@@ -348,7 +348,7 @@ public class StorageLoadBalancer impleme
                 if (logger_.isDebugEnabled())
                     logger_.debug("Disseminating load info ...");
                 
Gossiper.instance.addLocalApplicationState(ApplicationState.LOAD,
-                                                           
StorageService.valueFactory.load(StorageService.instance.getLoad()));
+                                                           
StorageService.instance.valueFactory.load(StorageService.instance.getLoad()));
             }
         };
         StorageService.scheduledTasks.scheduleWithFixedDelay(runnable, 2 * 
Gossiper.intervalInMillis_, BROADCAST_INTERVAL, TimeUnit.MILLISECONDS);

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1065676&r1=1065675&r2=1065676&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java 
Mon Jan 31 16:30:16 2011
@@ -27,7 +27,6 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.*;
 import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
 
 import com.google.common.base.Charsets;
@@ -151,13 +150,17 @@ public class StorageService implements I
 
     public static final RetryingScheduledThreadPoolExecutor scheduledTasks = 
new RetryingScheduledThreadPoolExecutor("ScheduledTasks");
 
-    private static IPartitioner partitioner_ = 
DatabaseDescriptor.getPartitioner();
-    public static VersionedValue.VersionedValueFactory valueFactory = new 
VersionedValue.VersionedValueFactory(partitioner_);
+    /* This abstraction maintains the token/endpoint metadata information */
+    private TokenMetadata tokenMetadata_ = new TokenMetadata();
+
+    private IPartitioner partitioner = DatabaseDescriptor.getPartitioner();
+    public VersionedValue.VersionedValueFactory valueFactory = new 
VersionedValue.VersionedValueFactory(partitioner);
     
     public static final StorageService instance = new StorageService();
 
-    public static IPartitioner getPartitioner() {
-        return partitioner_;
+    public static IPartitioner getPartitioner()
+    {
+        return instance.partitioner;
     }
 
     public Collection<Range> getLocalRanges(String table)
@@ -170,9 +173,6 @@ public class StorageService implements I
         return getPrimaryRangeForEndpoint(FBUtilities.getLocalAddress());
     }
 
-    /* This abstraction maintains the token/endpoint metadata information */
-    private TokenMetadata tokenMetadata_ = new TokenMetadata();
-
     private Set<InetAddress> replicatingNodes = 
Collections.synchronizedSet(new HashSet<InetAddress>());
     private CassandraDaemon daemon;
 
@@ -464,12 +464,12 @@ public class StorageService implements I
                 String initialToken = DatabaseDescriptor.getInitialToken();
                 if (initialToken == null)
                 {
-                    token = partitioner_.getRandomToken();
+                    token = partitioner.getRandomToken();
                     logger_.warn("Generated random token " + token + ". Random 
tokens will result in an unbalanced ring; see 
http://wiki.apache.org/cassandra/Operations";);
                 }
                 else
                 {
-                    token = 
partitioner_.getTokenFactory().fromString(initialToken);
+                    token = 
partitioner.getTokenFactory().fromString(initialToken);
                     logger_.info("Saved token not found. Using " + token + " 
from configuration");
                 }
             }
@@ -1449,7 +1449,7 @@ public class StorageService implements I
      */
     public List<InetAddress> getNaturalEndpoints(String table, ByteBuffer key)
     {
-        return getNaturalEndpoints(table, partitioner_.getToken(key));
+        return getNaturalEndpoints(table, partitioner.getToken(key));
     }
 
     /**
@@ -1473,7 +1473,7 @@ public class StorageService implements I
      */
     public List<InetAddress> getLiveNaturalEndpoints(String table, ByteBuffer 
key)
     {
-        return getLiveNaturalEndpoints(table, partitioner_.getToken(key));
+        return getLiveNaturalEndpoints(table, partitioner.getToken(key));
     }
 
     public List<InetAddress> getLiveNaturalEndpoints(String table, Token token)
@@ -1546,7 +1546,7 @@ public class StorageService implements I
         FBUtilities.sortSampledKeys(keys, range);
 
         if (keys.size() < 3)
-            return partitioner_.midpoint(range.left, range.right);
+            return partitioner.midpoint(range.left, range.right);
         else
             return keys.get(keys.size() / 2).token;
     }
@@ -1670,7 +1670,7 @@ public class StorageService implements I
 
     public void move(String newToken) throws IOException, InterruptedException
     {
-        move(partitioner_.getTokenFactory().fromString(newToken));
+        move(partitioner.getTokenFactory().fromString(newToken));
     }
 
     public void loadBalance() throws IOException, InterruptedException
@@ -1760,7 +1760,7 @@ public class StorageService implements I
     {
         InetAddress myAddress = FBUtilities.getLocalAddress();
         Token localToken = tokenMetadata_.getToken(myAddress);
-        Token token = partitioner_.getTokenFactory().fromString(tokenString);
+        Token token = partitioner.getTokenFactory().fromString(tokenString);
         InetAddress endpoint = tokenMetadata_.getEndpoint(token);
 
         if (endpoint == null)
@@ -2075,9 +2075,9 @@ public class StorageService implements I
     // Never ever do this at home. Used by tests.
     IPartitioner setPartitionerUnsafe(IPartitioner newPartitioner)
     {
-        IPartitioner oldPartitioner = partitioner_;
-        partitioner_ = newPartitioner;
-        valueFactory = new VersionedValue.VersionedValueFactory(partitioner_);
+        IPartitioner oldPartitioner = partitioner;
+        partitioner = newPartitioner;
+        valueFactory = new VersionedValue.VersionedValueFactory(partitioner);
         return oldPartitioner;
     }
 
@@ -2110,7 +2110,7 @@ public class StorageService implements I
     {
         List<Token> sortedTokens = new 
ArrayList<Token>(getTokenToEndpointMap().keySet());
         Collections.sort(sortedTokens);
-        return partitioner_.describeOwnership(sortedTokens);
+        return partitioner.describeOwnership(sortedTokens);
     }
 
     public List<String> getKeyspaces()

Modified: 
cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java?rev=1065676&r1=1065675&r2=1065676&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java 
(original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java 
Mon Jan 31 16:30:16 2011
@@ -85,7 +85,7 @@ public class BootStrapperTest extends Cl
             Range range = ss.getPrimaryRangeForEndpoint(bootstrapSource);
             Token token = StorageService.getPartitioner().midpoint(range.left, 
range.right);
             assert range.contains(token);
-            ss.onChange(bootstrapAddrs[i], ApplicationState.STATUS, 
StorageService.valueFactory.bootstrapping(token));
+            ss.onChange(bootstrapAddrs[i], ApplicationState.STATUS, 
StorageService.instance.valueFactory.bootstrapping(token));
         }
         
         // any further attempt to bootsrtap should fail since every node in 
the cluster is splitting.
@@ -102,7 +102,7 @@ public class BootStrapperTest extends Cl
         // indicate that one of the nodes is done. see if the node it was 
bootstrapping from is still available.
         Range range = ss.getPrimaryRangeForEndpoint(addrs[2]);
         Token token = StorageService.getPartitioner().midpoint(range.left, 
range.right);
-        ss.onChange(bootstrapAddrs[2], ApplicationState.STATUS, 
StorageService.valueFactory.normal(token));
+        ss.onChange(bootstrapAddrs[2], ApplicationState.STATUS, 
StorageService.instance.valueFactory.normal(token));
         load.put(bootstrapAddrs[2], 0d);
         InetAddress addr = 
BootStrapper.getBootstrapSource(ss.getTokenMetadata(), load);
         assert addr != null && addr.equals(addrs[2]);
@@ -134,7 +134,7 @@ public class BootStrapperTest extends Cl
         Range range5 = ss.getPrimaryRangeForEndpoint(five);
         Token fakeToken = 
StorageService.getPartitioner().midpoint(range5.left, range5.right);
         assert range5.contains(fakeToken);
-        ss.onChange(myEndpoint, ApplicationState.STATUS, 
StorageService.valueFactory.bootstrapping(fakeToken));
+        ss.onChange(myEndpoint, ApplicationState.STATUS, 
StorageService.instance.valueFactory.bootstrapping(fakeToken));
         tmd = ss.getTokenMetadata();
 
         InetAddress source4 = BootStrapper.getBootstrapSource(tmd, load);


Reply via email to