Author: gdusbabek
Date: Mon Jan 31 16:30:16 2011
New Revision: 1065676
URL: http://svn.apache.org/viewvc?rev=1065676&view=rev
Log:
merge from 0.7
Modified:
cassandra/trunk/ (props changed)
cassandra/trunk/CHANGES.txt
cassandra/trunk/conf/cassandra.yaml
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
(props changed)
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2Snitch.java
cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jan 31 16:30:16 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1064713
-/cassandra/branches/cassandra-0.7:1026516-1064915
+/cassandra/branches/cassandra-0.7:1026516-1065665
/cassandra/branches/cassandra-0.7.0:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3:774578-796573
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1065676&r1=1065675&r2=1065676&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Mon Jan 31 16:30:16 2011
@@ -58,7 +58,9 @@
(CASSANDRA-2058)
* fix math in RandomPartitioner.describeOwnership (CASSANDRA-2071)
* fix deletion of sstable non-data components (CASSANDRA-2059)
-
+ * avoid blocking gossip while deleting handoff hints (CASSANDRA-2073)
+ * ignore messages from newer versions, keep track of nodes in gossip
+ regardless of version (CASSANDRA-1970)
0.7.0-final
* fix offsets to ByteBuffer.get (CASSANDRA-1939)
Modified: cassandra/trunk/conf/cassandra.yaml
URL:
http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra.yaml?rev=1065676&r1=1065675&r2=1065676&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra.yaml (original)
+++ cassandra/trunk/conf/cassandra.yaml Mon Jan 31 16:30:16 2011
@@ -225,7 +225,7 @@ rpc_timeout_in_ms: 10000
# org.apache.cassandra.locator.PropertyFileSnitch:
# - Proximity is determined by rack and data center, which are
# explicitly configured in cassandra-topology.properties.
-endpoint_snitch: org.apache.cassandra.locator.SimpleSnitch
+endpoint_snitch: org.apache.cassandra.locator.PropertyFileSnitch
# dynamic_snitch -- This boolean controls whether the above snitch is
# wrapped with a dynamic snitch, which will monitor read latencies
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jan 31 16:30:16 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1064713
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1064915
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1065665
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jan 31 16:30:16 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1064713
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1064915
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1065665
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jan 31 16:30:16 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1064713
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1064915
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1065665
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jan 31 16:30:16 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1064713
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1064915
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1065665
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jan 31 16:30:16 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1064713
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1064915
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1065665
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
Modified:
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1065676&r1=1065675&r2=1065676&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Mon Jan 31 16:30:16 2011
@@ -120,8 +120,17 @@ public class DatabaseDescriptor
{
URL url = getStorageConfigURL();
logger.info("Loading settings from " + url);
-
- InputStream input = url.openStream();
+
+ InputStream input = null;
+ try
+ {
+ input = url.openStream();
+ }
+ catch (IOException e)
+ {
+ // getStorageConfigURL should have ruled this out
+ throw new AssertionError(e);
+ }
org.yaml.snakeyaml.constructor.Constructor constructor = new
org.yaml.snakeyaml.constructor.Constructor(Config.class);
TypeDescription desc = new TypeDescription(Config.class);
desc.putListPropertyType("keyspaces", RawKeyspace.class);
@@ -260,7 +269,16 @@ public class DatabaseDescriptor
/* Local IP or hostname to bind RPC server to */
if (conf.rpc_address != null)
- rpcAddress = InetAddress.getByName(conf.rpc_address);
+ {
+ try
+ {
+ rpcAddress = InetAddress.getByName(conf.rpc_address);
+ }
+ catch (UnknownHostException e)
+ {
+ throw new ConfigurationException("Unknown host in
rpc_address " + conf.rpc_address);
+ }
+ }
if (conf.thrift_framed_transport_size_in_mb > 0 &&
conf.thrift_max_message_length_in_mb < conf.thrift_framed_transport_size_in_mb)
{
@@ -298,6 +316,10 @@ public class DatabaseDescriptor
{
throw new ConfigurationException("Invalid Request
Scheduler class " + conf.request_scheduler);
}
+ catch (Exception e)
+ {
+ throw new ConfigurationException("Unable to instantiate
request scheduler", e);
+ }
}
else
{
@@ -374,33 +396,33 @@ public class DatabaseDescriptor
{
throw new ConfigurationException("seeds configuration is
missing; a minimum of one seed is required.");
}
- Class seedProviderClass =
Class.forName(conf.seed_provider.class_name);
- seedProvider =
(SeedProvider)seedProviderClass.getConstructor(Map.class).newInstance(conf.seed_provider.parameters);
+ try
+ {
+ Class seedProviderClass =
Class.forName(conf.seed_provider.class_name);
+ seedProvider =
(SeedProvider)seedProviderClass.getConstructor(Map.class).newInstance(conf.seed_provider.parameters);
+ }
+ // there are about 5 checked exceptions that could be thrown here.
+ catch (Exception e)
+ {
+ logger.error("Fatal configuration error", e);
+ System.err.println(e.getMessage() + "\nFatal configuration
error; unable to start server. See log for stacktrace.");
+ System.exit(1);
+ }
if (seedProvider.getSeeds().size() == 0)
throw new ConfigurationException("The seed provider lists no
seeds.");
}
- catch (UnknownHostException e)
- {
- logger.error("Fatal error: " + e.getMessage());
- System.err.println("Unable to start with unknown hosts configured.
Use IP addresses instead of hostnames.");
- System.exit(2);
- }
catch (ConfigurationException e)
{
- logger.error("Fatal error: " + e.getMessage(), e);
- System.err.println("Bad configuration; unable to start server");
+ logger.error("Fatal configuration error", e);
+ System.err.println(e.getMessage() + "\nFatal configuration error;
unable to start server. See log for stacktrace.");
System.exit(1);
}
catch (YAMLException e)
{
- logger.error("Fatal error: " + e.getMessage(), e);
- System.err.println("Bad configuration; unable to start server");
+ logger.error("Fatal configuration error error", e);
+ System.err.println(e.getMessage() + "\nInvalid yaml; unable to
start server. See log for stacktrace.");
System.exit(1);
}
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
}
private static IEndpointSnitch createEndpointSnitch(String
endpointSnitchClassName) throws ConfigurationException
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1065676&r1=1065675&r2=1065676&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
Mon Jan 31 16:30:16 2011
@@ -144,21 +144,31 @@ public class HintedHandOffManager
rm.apply();
}
- public static void deleteHintsForEndPoint(InetAddress endpoint)
+ public static void deleteHintsForEndPoint(final InetAddress endpoint)
{
- ColumnFamilyStore hintStore =
Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(HINTS_CF);
- RowMutation rm = new RowMutation(Table.SYSTEM_TABLE,
ByteBuffer.wrap(endpoint.getAddress()));
+ final ColumnFamilyStore hintStore =
Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(HINTS_CF);
+ final RowMutation rm = new RowMutation(Table.SYSTEM_TABLE,
ByteBuffer.wrap(endpoint.getAddress()));
rm.delete(new QueryPath(HINTS_CF), System.currentTimeMillis());
- try {
- logger_.info("Deleting any stored hints for " + endpoint);
- rm.apply();
- hintStore.forceFlush();
- CompactionManager.instance.submitMajor(hintStore, 0,
Integer.MAX_VALUE).get();
- }
- catch (Exception e)
+
+ // execute asynchronously to avoid blocking caller (which may be
processing gossip)
+ Runnable runnable = new Runnable()
{
- logger_.warn("Could not delete hints for " + endpoint + ": " + e);
- }
+ public void run()
+ {
+ try
+ {
+ logger_.info("Deleting any stored hints for " + endpoint);
+ rm.apply();
+ hintStore.forceFlush();
+ CompactionManager.instance.submitMajor(hintStore, 0,
Integer.MAX_VALUE);
+ }
+ catch (Exception e)
+ {
+ logger_.warn("Could not delete hints for " + endpoint + ":
" + e);
+ }
+ }
+ };
+ StorageService.scheduledTasks.execute(runnable);
}
private static boolean pagingFinished(ColumnFamily hintColumnFamily,
ByteBuffer startColumn)
Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1065676&r1=1065675&r2=1065676&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Mon Jan 31
16:30:16 2011
@@ -26,6 +26,7 @@ import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.*;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -141,6 +142,10 @@ public class Gossiper implements IFailur
* after removal to prevent nodes from falsely reincarnating during the
time when removal
* gossip gets propagated to all nodes */
Map<InetAddress, Long> justRemovedEndpoints_ = new
ConcurrentHashMap<InetAddress, Long>();
+
+ // protocol versions of the other nodes in the cluster
+ private final ConcurrentMap<InetAddress, Integer> versions = new
NonBlockingHashMap<InetAddress, Integer>();
+
private Gossiper()
{
@@ -169,6 +174,20 @@ public class Gossiper implements IFailur
{
subscribers_.remove(subscriber);
}
+
+ public void setVersion(InetAddress address, int version)
+ {
+ Integer old = versions.put(address, version);
+ EndpointState state = endpointStateMap_.get(address);
+ if (state == null)
+ addSavedEndpoint(address);
+ }
+
+ public Integer getVersion(InetAddress address)
+ {
+ return versions.get(address);
+ }
+
public Set<InetAddress> getLiveMembers()
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2Snitch.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2Snitch.java?rev=1065676&r1=1065675&r2=1065676&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2Snitch.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2Snitch.java Mon
Jan 31 16:30:16 2011
@@ -89,7 +89,7 @@ public class Ec2Snitch extends AbstractN
{
// Share EC2 info via gossip. We have to wait until Gossiper is
initialized though.
logger.info("Ec2Snitch adding ApplicationState ec2region=" + ec2region
+ " ec2zone=" + ec2zone);
- Gossiper.instance.addLocalApplicationState(ApplicationState.DC,
StorageService.valueFactory.datacenter(ec2region));
- Gossiper.instance.addLocalApplicationState(ApplicationState.RACK,
StorageService.valueFactory.rack(ec2zone));
+ Gossiper.instance.addLocalApplicationState(ApplicationState.DC,
StorageService.instance.valueFactory.datacenter(ec2region));
+ Gossiper.instance.addLocalApplicationState(ApplicationState.RACK,
StorageService.instance.valueFactory.rack(ec2zone));
}
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java?rev=1065676&r1=1065675&r2=1065676&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
Mon Jan 31 16:30:16 2011
@@ -50,22 +50,22 @@ public class TokenMetadata
// for any nodes that boot simultaneously between same two nodes. For this
we cannot simply make pending ranges a <tt>Multimap</tt>,
// since that would make us unable to notice the real problem of two nodes
trying to boot using the same token.
// In order to do this properly, we need to know what tokens are booting
at any time.
- private BiMap<Token, InetAddress> bootstrapTokens;
+ private BiMap<Token, InetAddress> bootstrapTokens = HashBiMap.create();
// we will need to know at all times what nodes are leaving and calculate
ranges accordingly.
// An anonymous pending ranges list is not enough, as that does not tell
which node is leaving
// and/or if the ranges are there because of bootstrap or leave operation.
// (See CASSANDRA-603 for more detail + examples).
- private Set<InetAddress> leavingEndpoints;
+ private Set<InetAddress> leavingEndpoints = new HashSet<InetAddress>();
- private ConcurrentMap<String, Multimap<Range, InetAddress>> pendingRanges;
+ private ConcurrentMap<String, Multimap<Range, InetAddress>> pendingRanges
= new ConcurrentHashMap<String, Multimap<Range, InetAddress>>();
/* Use this lock for manipulating the token map */
private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
private ArrayList<Token> sortedTokens;
/* list of subscribers that are notified when the tokenToEndpointMap
changed */
- private final CopyOnWriteArrayList<AbstractReplicationStrategy>
subscribers;
+ private final CopyOnWriteArrayList<AbstractReplicationStrategy>
subscribers = new CopyOnWriteArrayList<AbstractReplicationStrategy>();
public TokenMetadata()
{
@@ -77,11 +77,7 @@ public class TokenMetadata
if (tokenToEndpointMap == null)
tokenToEndpointMap = HashBiMap.create();
this.tokenToEndpointMap = tokenToEndpointMap;
- bootstrapTokens = HashBiMap.create();
- leavingEndpoints = new HashSet<InetAddress>();
- pendingRanges = new ConcurrentHashMap<String, Multimap<Range,
InetAddress>>();
sortedTokens = sortTokens();
- subscribers = new CopyOnWriteArrayList<AbstractReplicationStrategy>();
}
private ArrayList<Token> sortTokens()
Modified:
cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=1065676&r1=1065675&r2=1065676&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
Mon Jan 31 16:30:16 2011
@@ -24,6 +24,7 @@ package org.apache.cassandra.net;
import java.io.*;
import java.net.Socket;
+import org.apache.cassandra.gms.Gossiper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,6 +56,7 @@ public class IncomingTcpConnection exten
{
DataInputStream input;
boolean isStream;
+ int version;
try
{
// determine the connection type to decide whether to buffer
@@ -65,6 +67,8 @@ public class IncomingTcpConnection exten
if (!isStream)
// we should buffer
input = new DataInputStream(new
BufferedInputStream(socket.getInputStream(), 4096));
+ version = MessagingService.getBits(header, 15, 8);
+ Gossiper.instance.setVersion(socket.getInetAddress(), version);
}
catch (IOException e)
{
@@ -77,6 +81,12 @@ public class IncomingTcpConnection exten
{
if (isStream)
{
+ if (version > MessagingService.version_)
+ {
+ logger.error("Received untranslated stream from newer
protcol version. Terminating connection!");
+ close();
+ return;
+ }
int size = input.readInt();
byte[] headerBytes = new byte[size];
input.readFully(headerBytes);
@@ -89,12 +99,18 @@ public class IncomingTcpConnection exten
byte[] contentBytes = new byte[size];
input.readFully(contentBytes);
- Message message = Message.serializer().deserialize(new
DataInputStream(new ByteArrayInputStream(contentBytes)));
- MessagingService.instance().receive(message);
+ if (version > MessagingService.version_)
+ logger.info("Received connection from newer protocol
version. Ignorning message.");
+ else
+ {
+ Message message = Message.serializer().deserialize(new
DataInputStream(new ByteArrayInputStream(contentBytes)));
+ MessagingService.instance().receive(message);
+ }
}
// prepare to read the next message
MessagingService.validateMagic(input.readInt());
int header = input.readInt();
+ version = MessagingService.getBits(header, 15, 8);
assert isStream == (MessagingService.getBits(header, 3, 1) ==
1) : "Connections cannot change type: " + isStream;
}
catch (EOFException e)
Modified:
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1065676&r1=1065675&r2=1065676&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Mon
Jan 31 16:30:16 2011
@@ -60,7 +60,7 @@ import org.cliffc.high_scale_lib.NonBloc
public final class MessagingService implements MessagingServiceMBean
{
- private static final int version_ = 1;
+ public static final int version_ = 1;
//TODO: make this parameter dynamic somehow. Not sure if config is
appropriate.
private SerializerType serializerType_ = SerializerType.BINARY;
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java?rev=1065676&r1=1065675&r2=1065676&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
Mon Jan 31 16:30:16 2011
@@ -189,8 +189,8 @@ public abstract class AbstractCassandraD
}
catch (ConfigurationException e)
{
- logger.error("Fatal error: " + e.getMessage());
- System.err.println("Bad configuration; unable to start server");
+ logger.error("Fatal configuration error", e);
+ System.err.println(e.getMessage() + "\nFatal configuration error;
unable to start server. See log for stacktrace.");
System.exit(1);
}
@@ -213,8 +213,7 @@ public abstract class AbstractCassandraD
/**
* Start the Cassandra Daemon, assuming that it has already been
- * initialized, via either {@link #init(String[])} or
- * {@link #load(String[])}.
+ * initialized via {@link #init(String[])}
*
* Hook for JSVC
*
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java?rev=1065676&r1=1065675&r2=1065676&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java
Mon Jan 31 16:30:16 2011
@@ -97,7 +97,7 @@ public class MigrationManager implements
MessagingService.instance().sendOneWay(msg, host);
// this is for notifying nodes as they arrive in the cluster.
if (!StorageService.instance.isClientMode())
-
Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA,
StorageService.valueFactory.migration(version));
+
Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA,
StorageService.instance.valueFactory.migration(version));
}
/**
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=1065676&r1=1065675&r2=1065676&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
Mon Jan 31 16:30:16 2011
@@ -348,7 +348,7 @@ public class StorageLoadBalancer impleme
if (logger_.isDebugEnabled())
logger_.debug("Disseminating load info ...");
Gossiper.instance.addLocalApplicationState(ApplicationState.LOAD,
-
StorageService.valueFactory.load(StorageService.instance.getLoad()));
+
StorageService.instance.valueFactory.load(StorageService.instance.getLoad()));
}
};
StorageService.scheduledTasks.scheduleWithFixedDelay(runnable, 2 *
Gossiper.intervalInMillis_, BROADCAST_INTERVAL, TimeUnit.MILLISECONDS);
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1065676&r1=1065675&r2=1065676&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Mon Jan 31 16:30:16 2011
@@ -27,7 +27,6 @@ import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.*;
import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import com.google.common.base.Charsets;
@@ -151,13 +150,17 @@ public class StorageService implements I
public static final RetryingScheduledThreadPoolExecutor scheduledTasks =
new RetryingScheduledThreadPoolExecutor("ScheduledTasks");
- private static IPartitioner partitioner_ =
DatabaseDescriptor.getPartitioner();
- public static VersionedValue.VersionedValueFactory valueFactory = new
VersionedValue.VersionedValueFactory(partitioner_);
+ /* This abstraction maintains the token/endpoint metadata information */
+ private TokenMetadata tokenMetadata_ = new TokenMetadata();
+
+ private IPartitioner partitioner = DatabaseDescriptor.getPartitioner();
+ public VersionedValue.VersionedValueFactory valueFactory = new
VersionedValue.VersionedValueFactory(partitioner);
public static final StorageService instance = new StorageService();
- public static IPartitioner getPartitioner() {
- return partitioner_;
+ public static IPartitioner getPartitioner()
+ {
+ return instance.partitioner;
}
public Collection<Range> getLocalRanges(String table)
@@ -170,9 +173,6 @@ public class StorageService implements I
return getPrimaryRangeForEndpoint(FBUtilities.getLocalAddress());
}
- /* This abstraction maintains the token/endpoint metadata information */
- private TokenMetadata tokenMetadata_ = new TokenMetadata();
-
private Set<InetAddress> replicatingNodes =
Collections.synchronizedSet(new HashSet<InetAddress>());
private CassandraDaemon daemon;
@@ -464,12 +464,12 @@ public class StorageService implements I
String initialToken = DatabaseDescriptor.getInitialToken();
if (initialToken == null)
{
- token = partitioner_.getRandomToken();
+ token = partitioner.getRandomToken();
logger_.warn("Generated random token " + token + ". Random
tokens will result in an unbalanced ring; see
http://wiki.apache.org/cassandra/Operations");
}
else
{
- token =
partitioner_.getTokenFactory().fromString(initialToken);
+ token =
partitioner.getTokenFactory().fromString(initialToken);
logger_.info("Saved token not found. Using " + token + "
from configuration");
}
}
@@ -1449,7 +1449,7 @@ public class StorageService implements I
*/
public List<InetAddress> getNaturalEndpoints(String table, ByteBuffer key)
{
- return getNaturalEndpoints(table, partitioner_.getToken(key));
+ return getNaturalEndpoints(table, partitioner.getToken(key));
}
/**
@@ -1473,7 +1473,7 @@ public class StorageService implements I
*/
public List<InetAddress> getLiveNaturalEndpoints(String table, ByteBuffer
key)
{
- return getLiveNaturalEndpoints(table, partitioner_.getToken(key));
+ return getLiveNaturalEndpoints(table, partitioner.getToken(key));
}
public List<InetAddress> getLiveNaturalEndpoints(String table, Token token)
@@ -1546,7 +1546,7 @@ public class StorageService implements I
FBUtilities.sortSampledKeys(keys, range);
if (keys.size() < 3)
- return partitioner_.midpoint(range.left, range.right);
+ return partitioner.midpoint(range.left, range.right);
else
return keys.get(keys.size() / 2).token;
}
@@ -1670,7 +1670,7 @@ public class StorageService implements I
public void move(String newToken) throws IOException, InterruptedException
{
- move(partitioner_.getTokenFactory().fromString(newToken));
+ move(partitioner.getTokenFactory().fromString(newToken));
}
public void loadBalance() throws IOException, InterruptedException
@@ -1760,7 +1760,7 @@ public class StorageService implements I
{
InetAddress myAddress = FBUtilities.getLocalAddress();
Token localToken = tokenMetadata_.getToken(myAddress);
- Token token = partitioner_.getTokenFactory().fromString(tokenString);
+ Token token = partitioner.getTokenFactory().fromString(tokenString);
InetAddress endpoint = tokenMetadata_.getEndpoint(token);
if (endpoint == null)
@@ -2075,9 +2075,9 @@ public class StorageService implements I
// Never ever do this at home. Used by tests.
IPartitioner setPartitionerUnsafe(IPartitioner newPartitioner)
{
- IPartitioner oldPartitioner = partitioner_;
- partitioner_ = newPartitioner;
- valueFactory = new VersionedValue.VersionedValueFactory(partitioner_);
+ IPartitioner oldPartitioner = partitioner;
+ partitioner = newPartitioner;
+ valueFactory = new VersionedValue.VersionedValueFactory(partitioner);
return oldPartitioner;
}
@@ -2110,7 +2110,7 @@ public class StorageService implements I
{
List<Token> sortedTokens = new
ArrayList<Token>(getTokenToEndpointMap().keySet());
Collections.sort(sortedTokens);
- return partitioner_.describeOwnership(sortedTokens);
+ return partitioner.describeOwnership(sortedTokens);
}
public List<String> getKeyspaces()
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java?rev=1065676&r1=1065675&r2=1065676&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
(original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
Mon Jan 31 16:30:16 2011
@@ -85,7 +85,7 @@ public class BootStrapperTest extends Cl
Range range = ss.getPrimaryRangeForEndpoint(bootstrapSource);
Token token = StorageService.getPartitioner().midpoint(range.left,
range.right);
assert range.contains(token);
- ss.onChange(bootstrapAddrs[i], ApplicationState.STATUS,
StorageService.valueFactory.bootstrapping(token));
+ ss.onChange(bootstrapAddrs[i], ApplicationState.STATUS,
StorageService.instance.valueFactory.bootstrapping(token));
}
// any further attempt to bootsrtap should fail since every node in
the cluster is splitting.
@@ -102,7 +102,7 @@ public class BootStrapperTest extends Cl
// indicate that one of the nodes is done. see if the node it was
bootstrapping from is still available.
Range range = ss.getPrimaryRangeForEndpoint(addrs[2]);
Token token = StorageService.getPartitioner().midpoint(range.left,
range.right);
- ss.onChange(bootstrapAddrs[2], ApplicationState.STATUS,
StorageService.valueFactory.normal(token));
+ ss.onChange(bootstrapAddrs[2], ApplicationState.STATUS,
StorageService.instance.valueFactory.normal(token));
load.put(bootstrapAddrs[2], 0d);
InetAddress addr =
BootStrapper.getBootstrapSource(ss.getTokenMetadata(), load);
assert addr != null && addr.equals(addrs[2]);
@@ -134,7 +134,7 @@ public class BootStrapperTest extends Cl
Range range5 = ss.getPrimaryRangeForEndpoint(five);
Token fakeToken =
StorageService.getPartitioner().midpoint(range5.left, range5.right);
assert range5.contains(fakeToken);
- ss.onChange(myEndpoint, ApplicationState.STATUS,
StorageService.valueFactory.bootstrapping(fakeToken));
+ ss.onChange(myEndpoint, ApplicationState.STATUS,
StorageService.instance.valueFactory.bootstrapping(fakeToken));
tmd = ss.getTokenMetadata();
InetAddress source4 = BootStrapper.getBootstrapSource(tmd, load);