Updated Branches: refs/heads/trunk 51e6c7c0a -> e863c2b71
Use rpc_address for binary protocol and change default port patch by slebresne; reviewed by jbellis for CASSANDRA-4751 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e863c2b7 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e863c2b7 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e863c2b7 Branch: refs/heads/trunk Commit: e863c2b71cbf917dde2d0c5f236b749f0070c1dc Parents: 51e6c7c Author: Sylvain Lebresne <[email protected]> Authored: Thu Oct 4 11:11:29 2012 +0200 Committer: Sylvain Lebresne <[email protected]> Committed: Thu Oct 4 11:11:29 2012 +0200 ---------------------------------------------------------------------- CHANGES.txt | 2 + conf/cassandra.yaml | 7 ++-- src/java/org/apache/cassandra/config/Config.java | 1 - .../cassandra/config/DatabaseDescriptor.java | 20 +---------- .../org/apache/cassandra/transport/Server.java | 29 +++++++++++---- 5 files changed, 28 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e863c2b7/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index d86d890..32d76b4 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -18,6 +18,8 @@ * Add support for batchlog in CQL3 (CASSANDRA-4545, 4738) * Add support for multiple column family outputs in CFOF (CASSANDRA-4208) * Support repairing only the local DC nodes (CASSANDRA-4747) + * Use rpc_address for binary protocol and change default port (CASSANRA-4751) + 1.2-beta1 * add atomic_batch_mutate (CASSANDRA-4542, -4635) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e863c2b7/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index cbcb7b2..f98f9f0 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -308,12 +308,11 @@ listen_address: localhost # Whether to start the native transport server. # Currently, only the thrift server is started by default because the native # transport is considered beta. +# Please note that the address on which the native transport is bound is the +# same as the rpc_address. The port however is different and specified below. start_native_transport: false -# The address to bind the CQL native transport to. The same remarks than for -# rpc_address applies. -native_transport_address: localhost # port for the CQL native transport to listen for clients on -native_transport_port: 8000 +native_transport_port: 9042 # The maximum of thread handling requests. The meaning is the same than # rpc_max_threads. The default is unlimited. #native_transport_max_threads: 2048 http://git-wip-us.apache.org/repos/asf/cassandra/blob/e863c2b7/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 4f551ad..90746c3 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -83,7 +83,6 @@ public class Config public Integer rpc_recv_buff_size_in_bytes; public Boolean start_native_transport = false; - public String native_transport_address; public Integer native_transport_port = 8000; public Integer native_transport_max_threads = Integer.MAX_VALUE; http://git-wip-us.apache.org/repos/asf/cassandra/blob/e863c2b7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 91bd9b6..cc8f07f 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -64,7 +64,6 @@ public class DatabaseDescriptor private static InetAddress listenAddress; // leave null so we can fall through to getLocalHost private static InetAddress broadcastAddress; private static InetAddress rpcAddress; - private static InetAddress nativeTransportAddress; private static SeedProvider seedProvider; /* Hashing strategy Random or OPHF */ @@ -316,23 +315,6 @@ public class DatabaseDescriptor rpcAddress = FBUtilities.getLocalAddress(); } - /* Local IP or hostname to bind RPC server to */ - if (conf.native_transport_address != null) - { - try - { - nativeTransportAddress = InetAddress.getByName(conf.native_transport_address); - } - catch (UnknownHostException e) - { - throw new ConfigurationException("Unknown host in native_transport_address" + conf.native_transport_address); - } - } - else - { - nativeTransportAddress = FBUtilities.getLocalAddress(); - } - if (conf.thrift_framed_transport_size_in_mb <= 0) throw new ConfigurationException("thrift_framed_transport_size_in_mb must be positive"); @@ -986,7 +968,7 @@ public class DatabaseDescriptor public static InetAddress getNativeTransportAddress() { - return nativeTransportAddress; + return getRpcAddress(); } public static int getNativeTransportPort() http://git-wip-us.apache.org/repos/asf/cassandra/blob/e863c2b7/src/java/org/apache/cassandra/transport/Server.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java index 74885b7..ab91b19 100644 --- a/src/java/org/apache/cassandra/transport/Server.java +++ b/src/java/org/apache/cassandra/transport/Server.java @@ -19,6 +19,7 @@ package org.apache.cassandra.transport; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.UnknownHostException; import java.util.EnumMap; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; @@ -36,6 +37,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.gms.*; import org.apache.cassandra.service.CassandraDaemon; +import org.apache.cassandra.service.StorageService; import org.apache.cassandra.transport.messages.EventMessage; public class Server implements CassandraDaemon.Server @@ -206,11 +208,24 @@ public class Server implements CassandraDaemon.Server this.server = server; } + private InetAddress getRpcAddress(InetAddress endpoint) + { + try + { + return InetAddress.getByName(StorageService.instance.getRpcaddress(endpoint)); + } + catch (UnknownHostException e) + { + // That should not happen, so log an error, but return the + // endpoint address since there's a good change this is right + logger.error("Problem retrieving RPC address for " + endpoint, e); + return endpoint; + } + } + public void onJoin(InetAddress endpoint, EndpointState epState) { - // TODO: we don't gossip the native protocol ip/port yet, so use the - // endpoint address and ip on which this server is listening instead. - server.connectionTracker.send(Event.TopologyChange.newNode(endpoint, server.socket.getPort())); + server.connectionTracker.send(Event.TopologyChange.newNode(getRpcAddress(endpoint), server.socket.getPort())); } public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) @@ -219,22 +234,22 @@ public class Server implements CassandraDaemon.Server public void onAlive(InetAddress endpoint, EndpointState state) { - server.connectionTracker.send(Event.StatusChange.nodeUp(endpoint, server.socket.getPort())); + server.connectionTracker.send(Event.StatusChange.nodeUp(getRpcAddress(endpoint), server.socket.getPort())); } public void onDead(InetAddress endpoint, EndpointState state) { - server.connectionTracker.send(Event.StatusChange.nodeDown(endpoint, server.socket.getPort())); + server.connectionTracker.send(Event.StatusChange.nodeDown(getRpcAddress(endpoint), server.socket.getPort())); } public void onRemove(InetAddress endpoint) { - server.connectionTracker.send(Event.TopologyChange.removedNode(endpoint, server.socket.getPort())); + server.connectionTracker.send(Event.TopologyChange.removedNode(getRpcAddress(endpoint), server.socket.getPort())); } public void onRestart(InetAddress endpoint, EndpointState state) { - server.connectionTracker.send(Event.StatusChange.nodeUp(endpoint, server.socket.getPort())); + server.connectionTracker.send(Event.StatusChange.nodeUp(getRpcAddress(endpoint), server.socket.getPort())); } } }
