Updated Branches:
refs/heads/trunk 449573622 -> 4f2f97944
Merge branch 'cassandra-1.2' into trunk
Conflicts:
src/java/org/apache/cassandra/service/StorageProxy.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4f2f9794
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4f2f9794
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4f2f9794
Branch: refs/heads/trunk
Commit: 4f2f979446b8ccb071d11f81c59bf09ffdefb871
Parents: 4495736 5267112
Author: Sylvain Lebresne <[email protected]>
Authored: Wed Jan 16 18:48:11 2013 +0100
Committer: Sylvain Lebresne <[email protected]>
Committed: Wed Jan 16 18:48:11 2013 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/config/CFMetaData.java | 12 +
.../cassandra/config/DatabaseDescriptor.java | 19 ++
.../cassandra/config/ReadRepairDecision.java | 23 ++
.../org/apache/cassandra/db/ConsistencyLevel.java | 174 ++++++++++++++-
.../org/apache/cassandra/dht/AbstractBounds.java | 2 +
src/java/org/apache/cassandra/dht/Bounds.java | 5 +
.../org/apache/cassandra/dht/ExcludingBounds.java | 5 +
.../cassandra/dht/IncludingExcludingBounds.java | 5 +
src/java/org/apache/cassandra/dht/Range.java | 5 +
.../cassandra/locator/AbstractEndpointSnitch.java | 24 ++
.../locator/AbstractReplicationStrategy.java | 28 ++-
.../cassandra/locator/DynamicEndpointSnitch.java | 30 +++
.../apache/cassandra/locator/IEndpointSnitch.java | 8 +-
.../apache/cassandra/locator/LocalStrategy.java | 1 +
.../cassandra/locator/NetworkTopologyStrategy.java | 1 +
.../locator/OldNetworkTopologyStrategy.java | 1 +
.../apache/cassandra/locator/SimpleStrategy.java | 1 +
.../service/AbstractWriteResponseHandler.java | 38 ++-
.../cassandra/service/DatacenterReadCallback.java | 104 ---------
.../DatacenterSyncWriteResponseHandler.java | 45 +----
.../service/DatacenterWriteResponseHandler.java | 28 +--
.../org/apache/cassandra/service/ReadCallback.java | 93 ++-------
.../org/apache/cassandra/service/StorageProxy.java | 121 ++++++++---
.../apache/cassandra/service/StorageService.java | 6 +-
.../cassandra/service/WriteResponseHandler.java | 45 +---
.../ReplicationStrategyEndpointCacheTest.java | 2 +-
.../cassandra/locator/SimpleStrategyTest.java | 10 -
28 files changed, 481 insertions(+), 356 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f2f9794/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f2f9794/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f2f9794/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 9e0557f,7c33203..818162f
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@@ -87,9 -86,9 +87,10 @@@ public class DatabaseDescripto
private static long keyCacheSizeInMB;
private static IRowCacheProvider rowCacheProvider;
+ private static IAllocator memoryAllocator;
private static String localDC;
+ private static Comparator<InetAddress> localComparator;
/**
* Inspect the classpath to find storage configuration file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f2f9794/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index 3809580,df09171..467a382
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -321,8 -321,13 +322,13 @@@ public class StorageProxy implements St
private static void asyncRemoveFromBatchlog(Collection<InetAddress>
endpoints, UUID uuid)
{
RowMutation rm = new RowMutation(Table.SYSTEM_KS,
UUIDType.instance.decompose(uuid));
- rm.delete(new QueryPath(SystemTable.BATCHLOG_CF),
FBUtilities.timestampMicros());
+ rm.delete(SystemTable.BATCHLOG_CF, FBUtilities.timestampMicros());
- AbstractWriteResponseHandler handler = new
WriteResponseHandler(endpoints, Collections.<InetAddress>emptyList(),
ConsistencyLevel.ANY, Table.SYSTEM_KS, null, WriteType.SIMPLE);
+ AbstractWriteResponseHandler handler = new
WriteResponseHandler(endpoints,
+
Collections.<InetAddress>emptyList(),
+
ConsistencyLevel.ANY,
+
Table.open(Table.SYSTEM_KS),
+ null,
+
WriteType.SIMPLE);
updateBatchlog(rm, endpoints, handler);
}
@@@ -1096,10 -1120,54 +1121,53 @@@
int cql3RowCount = 0;
rows = new ArrayList<Row>();
List<AbstractBounds<RowPosition>> ranges =
getRestrictedRanges(command.range);
- for (AbstractBounds<RowPosition> range : ranges)
+ int i = 0;
+ AbstractBounds<RowPosition> nextRange = null;
+ List<InetAddress> nextEndpoints = null;
+ List<InetAddress> nextFilteredEndpoints = null;
+ while (i < ranges.size())
{
+ AbstractBounds<RowPosition> range = nextRange == null
+ ? ranges.get(i)
+ : nextRange;
+ List<InetAddress> liveEndpoints = nextEndpoints == null
+ ?
getLiveSortedEndpoints(table, range.right)
+ : nextEndpoints;
+ List<InetAddress> filteredEndpoints = nextFilteredEndpoints
== null
+ ?
consistency_level.filterForQuery(table, liveEndpoints)
+ : nextFilteredEndpoints;
+ ++i;
+
+ // getRestrictedRange has broken the queried range into
per-[vnode] token ranges, but this doesn't take
+ // the replication factor into account. If the intersection
of live endpoints for 2 consecutive ranges
+ // still meets the CL requirements, then we can merge both
ranges into the same RangeSliceCommand.
+ while (i < ranges.size())
+ {
+ nextRange = ranges.get(i);
+ nextEndpoints = getLiveSortedEndpoints(table,
nextRange.right);
+ nextFilteredEndpoints =
consistency_level.filterForQuery(table, liveEndpoints);
+
+ List<InetAddress> merged = intersection(liveEndpoints,
nextEndpoints);
+
+ // Check if there is enough endpoint for the merge to be
possible.
+ if (!consistency_level.isSufficientLiveNodes(table,
merged))
+ break;
+
+ List<InetAddress> filteredMerged =
consistency_level.filterForQuery(table, merged);
+
+ // Estimate whether merging will be a win or not
+ if
(!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(filteredMerged,
filteredEndpoints, nextFilteredEndpoints))
+ break;
+
+ // If we get there, merge this range and the next one
+ range = range.withNewRight(nextRange.right);
+ liveEndpoints = merged;
+ filteredEndpoints = filteredMerged;
+ ++i;
+ }
+
RangeSliceCommand nodeCmd = new
RangeSliceCommand(command.keyspace,
command.column_family,
-
command.super_column,
commandPredicate,
range,
command.row_filter,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f2f9794/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------