Updated Branches: refs/heads/trunk 088d965ce -> db4da73e5
Save EC2Snitch topology information in system table - take 2 patch by Vijay; reviewed by Jason Brown for CASSANDRA-5171 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/db4da73e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/db4da73e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/db4da73e Branch: refs/heads/trunk Commit: db4da73e58424de64d56d61a7bca10d33fe213d1 Parents: 088d965 Author: Vijay Parthasarathy <[email protected]> Authored: Tue Jul 9 19:08:42 2013 -0700 Committer: Vijay Parthasarathy <[email protected]> Committed: Tue Jul 9 19:08:42 2013 -0700 ---------------------------------------------------------------------- .../org/apache/cassandra/config/CFMetaData.java | 1 + .../org/apache/cassandra/db/SystemKeyspace.java | 16 ++++++++ .../org/apache/cassandra/locator/Ec2Snitch.java | 15 ++++++++ .../net/OutboundTcpConnectionPool.java | 4 ++ .../Keyspace1-Standard1-ic-0-Summary.db | Bin 194 -> 202 bytes test/data/serialization/2.0/db.RowMutation.bin | Bin 3599 -> 3599 bytes .../apache/cassandra/locator/EC2SnitchTest.java | 38 +++++++++++++++++++ 7 files changed, 74 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/db4da73e/src/java/org/apache/cassandra/config/CFMetaData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java index f131cda..fe984f9 100644 --- a/src/java/org/apache/cassandra/config/CFMetaData.java +++ b/src/java/org/apache/cassandra/config/CFMetaData.java @@ -185,6 +185,7 @@ public final class CFMetaData + "schema_version uuid," + "release_version text," + "rpc_address inet," + + "preferred_ip inet," + "data_center text," + "rack text" + ") WITH COMMENT='known peers in the cluster'"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/db4da73e/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index e686f16..ba8f63a 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -269,6 +269,13 @@ public class SystemKeyspace forceBlockingFlush(PEERS_CF); } + public static synchronized void updatePreferredIP(InetAddress ep, InetAddress preferred_ip) + { + String req = "INSERT INTO system.%s (peer, preferred_ip) VALUES ('%s', '%s')"; + processInternal(String.format(req, PEERS_CF, ep.getHostAddress(), preferred_ip.getHostAddress())); + forceBlockingFlush(PEERS_CF); + } + public static synchronized void updatePeerInfo(InetAddress ep, String columnName, String value) { if (ep.equals(FBUtilities.getBroadcastAddress())) @@ -393,6 +400,15 @@ public class SystemKeyspace return hostIdMap; } + public static InetAddress getPreferredIP(InetAddress ep) + { + String req = "SELECT preferred_ip FROM system.%s WHERE peer='%s'"; + UntypedResultSet result = processInternal(String.format(req, PEERS_CF, ep.getHostAddress())); + if (!result.isEmpty() && result.one().has("preferred_ip")) + return result.one().getInetAddress("preferred_ip"); + return null; + } + /** * Return a map of IP addresses containing a map of dc and rack info */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/db4da73e/src/java/org/apache/cassandra/locator/Ec2Snitch.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/Ec2Snitch.java b/src/java/org/apache/cassandra/locator/Ec2Snitch.java index a28e2a6..5dc8638 100644 --- a/src/java/org/apache/cassandra/locator/Ec2Snitch.java +++ b/src/java/org/apache/cassandra/locator/Ec2Snitch.java @@ -23,11 +23,13 @@ import java.io.IOException; import java.net.HttpURLConnection; import java.net.InetAddress; import java.net.URL; +import java.util.Map; import com.google.common.base.Charsets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.EndpointState; @@ -44,6 +46,7 @@ public class Ec2Snitch extends AbstractNetworkTopologySnitch protected static final String ZONE_NAME_QUERY_URL = "http://169.254.169.254/latest/meta-data/placement/availability-zone"; private static final String DEFAULT_DC = "UNKNOWN-DC"; private static final String DEFAULT_RACK = "UNKNOWN-RACK"; + private Map<InetAddress, Map<String, String>> savedEndpoints; protected String ec2zone; protected String ec2region; @@ -93,7 +96,13 @@ public class Ec2Snitch extends AbstractNetworkTopologySnitch return ec2zone; EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint); if (state == null || state.getApplicationState(ApplicationState.RACK) == null) + { + if (savedEndpoints == null) + savedEndpoints = SystemKeyspace.loadDcRackInfo(); + if (savedEndpoints.containsKey(endpoint)) + return savedEndpoints.get(endpoint).get("rack"); return DEFAULT_RACK; + } return state.getApplicationState(ApplicationState.RACK).value; } @@ -103,7 +112,13 @@ public class Ec2Snitch extends AbstractNetworkTopologySnitch return ec2region; EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint); if (state == null || state.getApplicationState(ApplicationState.DC) == null) + { + if (savedEndpoints == null) + savedEndpoints = SystemKeyspace.loadDcRackInfo(); + if (savedEndpoints.containsKey(endpoint)) + return savedEndpoints.get(endpoint).get("data_center"); return DEFAULT_DC; + } return state.getApplicationState(ApplicationState.DC).value; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/db4da73e/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java index 86476b1..4efb507 100644 --- a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java +++ b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java @@ -26,6 +26,7 @@ import java.nio.channels.SocketChannel; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.locator.IEndpointSnitch; import org.apache.cassandra.metrics.ConnectionMetrics; import org.apache.cassandra.security.SSLFactory; @@ -45,6 +46,8 @@ public class OutboundTcpConnectionPool OutboundTcpConnectionPool(InetAddress remoteEp) { id = remoteEp; + resetedEndpoint = SystemKeyspace.getPreferredIP(remoteEp); + cmdCon = new OutboundTcpConnection(this); cmdCon.start(); ackCon = new OutboundTcpConnection(this); @@ -87,6 +90,7 @@ public class OutboundTcpConnectionPool */ public void reset(InetAddress remoteEP) { + SystemKeyspace.updatePreferredIP(id, remoteEP); resetedEndpoint = remoteEP; for (OutboundTcpConnection conn : new OutboundTcpConnection[] { cmdCon, ackCon }) conn.softCloseSocket(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/db4da73e/test/data/legacy-sstables/ic/Keyspace1/Keyspace1-Standard1-ic-0-Summary.db ---------------------------------------------------------------------- diff --git a/test/data/legacy-sstables/ic/Keyspace1/Keyspace1-Standard1-ic-0-Summary.db b/test/data/legacy-sstables/ic/Keyspace1/Keyspace1-Standard1-ic-0-Summary.db index e93acef..c1bc2e2 100644 Binary files a/test/data/legacy-sstables/ic/Keyspace1/Keyspace1-Standard1-ic-0-Summary.db and b/test/data/legacy-sstables/ic/Keyspace1/Keyspace1-Standard1-ic-0-Summary.db differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/db4da73e/test/data/serialization/2.0/db.RowMutation.bin ---------------------------------------------------------------------- diff --git a/test/data/serialization/2.0/db.RowMutation.bin b/test/data/serialization/2.0/db.RowMutation.bin index c9fcc67..4b525d3 100644 Binary files a/test/data/serialization/2.0/db.RowMutation.bin and b/test/data/serialization/2.0/db.RowMutation.bin differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/db4da73e/test/unit/org/apache/cassandra/locator/EC2SnitchTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/locator/EC2SnitchTest.java b/test/unit/org/apache/cassandra/locator/EC2SnitchTest.java index db79a73..456d3ac 100644 --- a/test/unit/org/apache/cassandra/locator/EC2SnitchTest.java +++ b/test/unit/org/apache/cassandra/locator/EC2SnitchTest.java @@ -25,19 +25,35 @@ import static org.junit.Assert.assertEquals; import java.io.IOException; import java.net.InetAddress; +import java.net.UnknownHostException; import java.util.Map; +import junit.framework.Assert; + +import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.gms.VersionedValue; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.OutboundTcpConnectionPool; import org.apache.cassandra.service.StorageService; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; public class EC2SnitchTest { private static String az; + @BeforeClass + public static void setup() throws Exception + { + SchemaLoader.mkdirs(); + SchemaLoader.cleanup(); + StorageService.instance.initServer(0); + } + private class TestEC2Snitch extends Ec2Snitch { public TestEC2Snitch() throws IOException, ConfigurationException @@ -81,4 +97,26 @@ public class EC2SnitchTest assertEquals("us-east-2", snitch.getDatacenter(local)); assertEquals("2d", snitch.getRack(local)); } + + @Test + public void testEc2MRSnitch() throws UnknownHostException + { + InetAddress me = InetAddress.getByName("127.0.0.2"); + InetAddress com_ip = InetAddress.getByName("127.0.0.3"); + + OutboundTcpConnectionPool pool = MessagingService.instance().getConnectionPool(me); + Assert.assertEquals(me, pool.endPoint()); + pool.reset(com_ip); + Assert.assertEquals(com_ip, pool.endPoint()); + + MessagingService.instance().destroyConnectionPool(me); + pool = MessagingService.instance().getConnectionPool(me); + Assert.assertEquals(com_ip, pool.endPoint()); + } + + @AfterClass + public static void tearDown() + { + StorageService.instance.stopClient(); + } }
