Author: brandonwilliams Date: Thu Jul 28 20:27:13 2011 New Revision: 1151990
URL: http://svn.apache.org/viewvc?rev=1151990&view=rev Log: Ec2 snitch with support for multiple regions. Patch by Vijay Parthasarathy, reviewed by brandonwilliams for CASSANDRA-2452 Added: cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java (with props) Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java?rev=1151990&r1=1151989&r2=1151990&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java Thu Jul 28 20:27:13 2011 @@ -29,6 +29,7 @@ public enum ApplicationState DC, RACK, RELEASE_VERSION, + INTERNAL_IP, // pad to allow adding new states to existing cluster X1, X2, Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java?rev=1151990&r1=1151989&r2=1151990&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java Thu Jul 28 20:27:13 2011 @@ -159,6 +159,11 @@ public class VersionedValue implements C { return new VersionedValue(FBUtilities.getReleaseVersionString()); } + + public VersionedValue internalIP(String private_ip) + { + return new VersionedValue(private_ip); + } } private static class VersionedValueSerializer implements ICompactSerializer<VersionedValue> Added: cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java?rev=1151990&view=auto ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java (added) +++ cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java Thu Jul 28 20:27:13 2011 @@ -0,0 +1,103 @@ +package org.apache.cassandra.locator; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; + +import org.apache.cassandra.config.ConfigurationException; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.gms.ApplicationState; +import org.apache.cassandra.gms.EndpointState; +import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.gms.IEndpointStateChangeSubscriber; +import org.apache.cassandra.gms.VersionedValue; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.service.StorageService; + +/** + * 1) Snitch will automatically set the public IP by querying the AWS API + * + * 2) Snitch will set the private IP as a Gossip application state. + * + * 3) Snitch implements IESCS and will reset the connection if it is within the + * same region to communicate via private IP. + * + * Implements Ec2Snitch to inherit its functionality and extend it for + * Multi-Region. + * + * Operational: All the nodes in this cluster needs to be able to (modify the + * Security group settings in AWS) communicate via Public IP's. + */ +public class Ec2MultiRegionSnitch extends Ec2Snitch implements IEndpointStateChangeSubscriber +{ + private static final String PUBLIC_IP_QUERY_URL = "http://169.254.169.254/latest/meta-data/public-ipv4"; + private static final String PRIVATE_IP_QUERY_URL = "http://169.254.169.254/latest/meta-data/local-ipv4"; + private final InetAddress public_ip; + private final String private_ip; + + public Ec2MultiRegionSnitch() throws IOException, ConfigurationException + { + super(); + public_ip = InetAddress.getByName(awsApiCall(PUBLIC_IP_QUERY_URL)); + logger.info("EC2Snitch using publicIP as identifier: " + public_ip); + private_ip = awsApiCall(PRIVATE_IP_QUERY_URL); + // use the Public IP to broadcast Address to other nodes. + DatabaseDescriptor.setBroadcastAddress(public_ip); + } + + @Override + public void onJoin(InetAddress endpoint, EndpointState epState) + { + if (epState.getApplicationState(ApplicationState.INTERNAL_IP) != null) + reConnect(endpoint, epState.getApplicationState(ApplicationState.INTERNAL_IP)); + } + + @Override + public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) + { + if (state == ApplicationState.INTERNAL_IP) + reConnect(endpoint, value); + } + + @Override + public void onAlive(InetAddress endpoint, EndpointState state) + { + if (state.getApplicationState(ApplicationState.INTERNAL_IP) != null) + reConnect(endpoint, state.getApplicationState(ApplicationState.INTERNAL_IP)); + } + + @Override + public void onDead(InetAddress endpoint, EndpointState state) + { + // do nothing + } + + @Override + public void onRemove(InetAddress endpoint) + { + // do nothing. + } + + private void reConnect(InetAddress endpoint, VersionedValue versionedValue) + { + if (!getDatacenter(endpoint).equals(getDatacenter(public_ip))) + return; // do nothing return back... + + try + { + InetAddress remoteIP = InetAddress.getByName(versionedValue.value); + MessagingService.instance().getConnectionPool(endpoint).reset(remoteIP); + logger.debug(String.format("Intiated reconnect to an Internal IP %s for the %s", remoteIP, endpoint)); + } catch (UnknownHostException e) + { + logger.error("Error in getting the IP address resolved: ", e); + } + } + + public void gossiperStarting() + { + super.gossiperStarting(); + Gossiper.instance.addLocalApplicationState(ApplicationState.INTERNAL_IP, StorageService.instance.valueFactory.internalIP(private_ip)); + Gossiper.instance.register(this); + } +} Propchange: cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java?rev=1151990&r1=1151989&r2=1151990&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java Thu Jul 28 20:27:13 2011 @@ -44,10 +44,10 @@ public class OutboundTcpConnection exten { private static final Logger logger = LoggerFactory.getLogger(OutboundTcpConnection.class); - private static final ByteBuffer CLOSE_SENTINEL = ByteBuffer.allocate(0); + public static final ByteBuffer CLOSE_SENTINEL = ByteBuffer.allocate(0); private static final int OPEN_RETRY_DELAY = 100; // ms between retries - private final InetAddress endpoint; + private InetAddress endpoint; private final BlockingQueue<ByteBuffer> queue = new LinkedBlockingQueue<ByteBuffer>(); private DataOutputStream output; private Socket socket; @@ -56,7 +56,12 @@ public class OutboundTcpConnection exten public OutboundTcpConnection(InetAddress remoteEp) { super("WRITE-" + remoteEp); - this.endpoint = remoteEp; + setEndPoint(remoteEp); + } + + public void setEndPoint(InetAddress remoteEndPoint) + { + this.endpoint = remoteEndPoint; } public void write(ByteBuffer buffer) Modified: cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java?rev=1151990&r1=1151989&r2=1151990&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java Thu Jul 28 20:27:13 2011 @@ -22,7 +22,7 @@ import java.net.InetAddress; import org.apache.cassandra.concurrent.Stage; -class OutboundTcpConnectionPool +public class OutboundTcpConnectionPool { public final OutboundTcpConnection cmdCon; public final OutboundTcpConnection ackCon; @@ -52,4 +52,12 @@ class OutboundTcpConnectionPool for (OutboundTcpConnection con : new OutboundTcpConnection[] { cmdCon, ackCon }) con.closeSocket(); } + + public void reset(InetAddress remoteEP) + { + ackCon.setEndPoint(remoteEP); + ackCon.write(OutboundTcpConnection.CLOSE_SENTINEL); + cmdCon.setEndPoint(remoteEP); + cmdCon.write(OutboundTcpConnection.CLOSE_SENTINEL); + } }