Author: jbellis
Date: Tue Apr 6 14:58:17 2010
New Revision: 931172
URL: http://svn.apache.org/viewvc?rev=931172&view=rev
Log:
merge from 0.6
Modified:
cassandra/trunk/ (props changed)
cassandra/trunk/CHANGES.txt
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
(props changed)
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
cassandra/trunk/src/java/org/apache/cassandra/tools/ClusterCmd.java
cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Apr 6 14:58:17 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6:922689-929775
+/cassandra/branches/cassandra-0.6:922689-931169
/incubator/cassandra/branches/cassandra-0.3:774578-796573
/incubator/cassandra/branches/cassandra-0.4:810145-834239,834349-834350
/incubator/cassandra/branches/cassandra-0.5:888872-915439
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=931172&r1=931171&r2=931172&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Tue Apr 6 14:58:17 2010
@@ -11,6 +11,12 @@ dev
* fix NPE in sstable2json when no excluded keys are given (CASSANDRA-934)
* keep the replica set constant throughout the read repair process
(CASSANDRA-937)
+ * allow querying getAllRanges with empty token list (CASSANDRA-933)
+ * fix command line arguments inversion in clustertool (CASSANDRA-942)
+ * fix race condition that could trigger a false-positive assertion
+ during post-flush discard of old commitlog segments (CASSANDRA-936)
+ * fix neighbor calculation for anti-entropy repair (CASSANDRA-924)
+ * perform repair even for small entropy differences (CASSANDRA-924)
0.6.0-RC1
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Apr 6 14:58:17 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-929775
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-931169
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Cassandra.java:810145-834239,834349-834350
/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Cassandra.java:888872-903502
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Apr 6 14:58:17 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-929775
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-931169
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Column.java:810145-834239,834349-834350
/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Column.java:888872-903502
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Apr 6 14:58:17 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-929775
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-931169
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:810145-834239,834349-834350
/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:888872-903502
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Apr 6 14:58:17 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-929775
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-931169
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:810145-834239,834349-834350
/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:888872-903502
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Apr 6 14:58:17 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-929775
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-931169
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:810145-834239,834349-834350
/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:888872-903502
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=931172&r1=931171&r2=931172&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
Tue Apr 6 14:58:17 2010
@@ -387,10 +387,10 @@ public class CommitLog
/*
* log replay assumes that we only have to look at entries past the
last
- * flush position, so verify that this flush happens after the last.
+ * flush position, so verify that this flush happens after the last.
See CASSANDRA-936
*/
- assert context.position >
context.getSegment().getHeader().getPosition(id) : "discard called on obsolete
context " + context;
-
+ assert context.position >=
context.getSegment().getHeader().getPosition(id)
+ : "discard at " + context + " is not after last flush at " +
context.getSegment().getHeader().getPosition(id);
/*
* Loop through all the commit log files in the history. Now process
* all files that are older than the one in the context. For each of
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=931172&r1=931171&r2=931172&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
Tue Apr 6 14:58:17 2010
@@ -45,9 +45,6 @@ import org.apache.cassandra.utils.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Collections2;
-import com.google.common.base.Predicates;
-
/**
* AntiEntropyService encapsulates "validating" (hashing) individual column
families,
* exchanging MerkleTrees with remote nodes via a TreeRequest/Response
conversation,
@@ -143,12 +140,18 @@ public class AntiEntropyService
/**
* Return all of the neighbors with whom we share data.
*/
- private static Collection<InetAddress> getNeighbors(String table)
+ public static Set<InetAddress> getNeighbors(String table)
{
- InetAddress local = FBUtilities.getLocalAddress();
StorageService ss = StorageService.instance;
- return Collections2.filter(ss.getNaturalEndpoints(table,
ss.getLocalToken()),
- Predicates.not(Predicates.equalTo(local)));
+ Set<InetAddress> neighbors = new HashSet<InetAddress>();
+ Map<Range, List<InetAddress>> replicaSets =
ss.getRangeToAddressMap(table);
+ for (Range range : ss.getLocalRanges(table))
+ {
+ // for every range stored locally (replica or original) collect
neighbors storing copies
+ neighbors.addAll(replicaSets.get(range));
+ }
+ neighbors.remove(FBUtilities.getLocalAddress());
+ return neighbors;
}
/**
@@ -581,11 +584,8 @@ public class AntiEntropyService
logger.debug("Endpoints " + local + " and " + remote + "
are consistent for " + cf);
return;
}
-
- if (difference < 0.05)
- performRangeRepair();
- else
- performStreamingRepair();
+
+ performStreamingRepair();
}
catch(IOException e)
{
@@ -606,17 +606,6 @@ public class AntiEntropyService
}
/**
- * Sends our list of differences to the remote endpoint using read
- * repairs via the query API.
- */
- void performRangeRepair() throws IOException
- {
- logger.info("Performing range read repair of " +
differences.size() + " ranges for " + cf);
- // FIXME
- logger.debug("Finished range read repair for " + cf);
- }
-
- /**
* Sends our list of differences to the remote endpoint using the
* Streaming API.
*/
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=931172&r1=931171&r2=931172&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Tue Apr 6 14:58:17 2010
@@ -1058,7 +1058,8 @@ public class StorageService implements I
{
// request that all relevant endpoints generate trees
final MessagingService ms = MessagingService.instance;
- final List<InetAddress> endpoints = getNaturalEndpoints(tableName,
getLocalToken());
+ final Set<InetAddress> endpoints =
AntiEntropyService.getNeighbors(tableName);
+ endpoints.add(FBUtilities.getLocalAddress());
for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName,
columnFamilies))
{
Message request = TreeRequestVerbHandler.makeVerb(tableName,
cfStore.getColumnFamilyName());
@@ -1120,6 +1121,8 @@ public class StorageService implements I
if (logger_.isDebugEnabled())
logger_.debug("computing ranges for " +
StringUtils.join(sortedTokens, ", "));
+ if (sortedTokens.isEmpty())
+ return Collections.emptyList();
List<Range> ranges = new ArrayList<Range>();
int size = sortedTokens.size();
for (int i = 1; i < size; ++i)
Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/ClusterCmd.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/ClusterCmd.java?rev=931172&r1=931171&r2=931172&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/ClusterCmd.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/ClusterCmd.java Tue Apr
6 14:58:17 2010
@@ -50,7 +50,7 @@ public class ClusterCmd {
Option optHost = new Option(HOST_OPT_SHORT, HOST_OPT_LONG, true, "node
hostname or ip address");
optHost.setRequired(true);
options.addOption(optHost);
- options.addOption(PORT_OPT_SHORT, PORT_OPT_LONG, true, "remote jmx
agent port number");
+ options.addOption(PORT_OPT_SHORT, PORT_OPT_LONG, true, "remote jmx
agent port number (defaults to " + defaultPort + ")");
}
/**
@@ -139,14 +139,14 @@ public class ClusterCmd {
{
HelpFormatter hf = new HelpFormatter();
String header = String.format(
- "%nAvailable commands: get_endpoints [key], global_snapshot
[name], clear_global_snapshot");
+ "%nAvailable commands: get_endpoints [keyspace] [key],
global_snapshot [name], clear_global_snapshot");
String usage = String.format("java %s -host <arg> <command>%n",
ClusterCmd.class.getName());
hf.printHelp(usage, "", options, header);
}
- public void printEndPoints(String key, String table)
+ public void printEndPoints(String keyspace, String key)
{
- List<InetAddress> endpoints = probe.getEndPoints(key, table);
+ List<InetAddress> endpoints = probe.getEndPoints(keyspace, key);
System.out.println(String.format("%-17s: %s", "Key", key));
System.out.println(String.format("%-17s: %s", "Endpoints", endpoints));
}
@@ -256,7 +256,9 @@ public class ClusterCmd {
{
if (arguments.length <= 2)
{
- System.err.println("missing key and/or table argument");
+ System.err.println("missing keyspace and/or key argument");
+ ClusterCmd.printUsage();
+ System.exit(1);
}
clusterCmd.printEndPoints(arguments[1], arguments[2]);
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=931172&r1=931171&r2=931172&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java Tue Apr
6 14:58:17 2010
@@ -400,9 +400,9 @@ public class NodeProbe
}
}
- public List<InetAddress> getEndPoints(String key, String table)
+ public List<InetAddress> getEndPoints(String keyspace, String key)
{
- return ssProxy.getNaturalEndpoints(key, table);
+ return ssProxy.getNaturalEndpoints(keyspace, key);
}
public Set<InetAddress> getStreamDestinations()
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java?rev=931172&r1=931171&r2=931172&view=diff
==============================================================================
---
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
(original)
+++
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
Tue Apr 6 14:58:17 2010
@@ -31,6 +31,7 @@ import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.CompactionIterator.CompactedRow;
import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.TokenMetadata;
import static org.apache.cassandra.service.AntiEntropyService.*;
import org.apache.cassandra.utils.FBUtilities;
@@ -41,40 +42,39 @@ import org.apache.cassandra.config.Datab
import org.apache.cassandra.Util;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
public class AntiEntropyServiceTest extends CleanupHelper
{
// table and column family to test against
- public AntiEntropyService aes;
+ public static AntiEntropyService aes;
public static String tablename;
public static String cfname;
public static InetAddress LOCAL, REMOTE;
- private static boolean initialized;
+ @BeforeClass
+ public static void prepareClass() throws Exception
+ {
+ LOCAL = FBUtilities.getLocalAddress();
+ tablename = "Keyspace4";
+ StorageService.instance.initServer();
+ // generate a fake endpoint for which we can spoof receiving/sending
trees
+ REMOTE = InetAddress.getByName("127.0.0.2");
+ cfname = Table.open(tablename).getColumnFamilies().iterator().next();
+ }
@Before
public void prepare() throws Exception
{
- if (!initialized)
- {
- LOCAL = FBUtilities.getLocalAddress();
- tablename = "Keyspace4";
-
- StorageService.instance.initServer();
- // generate a fake endpoint for which we can spoof
receiving/sending trees
- TokenMetadata tmd = StorageService.instance.getTokenMetadata();
- IPartitioner part = StorageService.getPartitioner();
- REMOTE = InetAddress.getByName("127.0.0.2");
- tmd.updateNormalToken(part.getMinimumToken(), REMOTE);
- assert tmd.isMember(REMOTE);
-
- cfname =
Table.open(tablename).getColumnFamilies().iterator().next();
- initialized = true;
- }
aes = AntiEntropyService.instance;
+ TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+ tmd.clearUnsafe();
+
tmd.updateNormalToken(StorageService.getPartitioner().getRandomToken(), LOCAL);
+
tmd.updateNormalToken(StorageService.getPartitioner().getMinimumToken(),
REMOTE);
+ assert tmd.isMember(REMOTE);
}
@Test
@@ -202,6 +202,32 @@ public class AntiEntropyServiceTest exte
}
@Test
+ public void testGetNeighborsPlusOne() throws Throwable
+ {
+ // generate rf+1 nodes, and ensure that all nodes are returned
+ Set<InetAddress> expected = addTokens(1 + 1 +
DatabaseDescriptor.getReplicationFactor(tablename));
+ expected.remove(FBUtilities.getLocalAddress());
+ assertEquals(expected, AntiEntropyService.getNeighbors(tablename));
+ }
+
+ @Test
+ public void testGetNeighborsTimesTwo() throws Throwable
+ {
+ TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+
+ // generate rf*2 nodes, and ensure that only neighbors specified by
the ARS are returned
+ addTokens(1 + (2 *
DatabaseDescriptor.getReplicationFactor(tablename)));
+ AbstractReplicationStrategy ars =
StorageService.instance.getReplicationStrategy(tablename);
+ Set<InetAddress> expected = new HashSet<InetAddress>();
+ for (Range replicaRange :
ars.getAddressRanges(tablename).get(FBUtilities.getLocalAddress()))
+ {
+ expected.addAll(ars.getRangeAddresses(tmd,
tablename).get(replicaRange));
+ }
+ expected.remove(FBUtilities.getLocalAddress());
+ assertEquals(expected, AntiEntropyService.getNeighbors(tablename));
+ }
+
+ @Test
public void testDifferencer() throws Throwable
{
// generate a tree
@@ -232,6 +258,19 @@ public class AntiEntropyServiceTest exte
assertEquals("Wrong differing range", changed,
diff.differences.get(0));
}
+ Set<InetAddress> addTokens(int max) throws Throwable
+ {
+ TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+ Set<InetAddress> endpoints = new HashSet<InetAddress>();
+ for (int i = 1; i < max; i++)
+ {
+ InetAddress endpoint = InetAddress.getByName("127.0.0." + i);
+
tmd.updateNormalToken(StorageService.getPartitioner().getRandomToken(),
endpoint);
+ endpoints.add(endpoint);
+ }
+ return endpoints;
+ }
+
Future<Object> flushAES()
{
return StageManager.getStage(StageManager.AE_SERVICE_STAGE).submit(new
Callable<Object>()
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java?rev=931172&r1=931171&r2=931172&view=diff
==============================================================================
---
cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
(original)
+++
cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
Tue Apr 6 14:58:17 2010
@@ -21,12 +21,16 @@ package org.apache.cassandra.service;
import java.io.File;
import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
import org.junit.Test;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.CleanupHelper;
import org.apache.cassandra.config.DatabaseDescriptor;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -49,4 +53,12 @@ public class StorageServiceServerTest
//StorageService.instance.decommission();
StorageService.instance.stopClient();
}
-}
\ No newline at end of file
+
+ @Test
+ public void testGetAllRangesEmpty()
+ {
+ List<Token> toks = Collections.emptyList();
+ assertEquals(Collections.emptyList(),
StorageService.instance.getAllRanges(toks));
+ }
+}
+