Author: jbellis
Date: Tue Nov 3 02:23:53 2009
New Revision: 832266
URL: http://svn.apache.org/viewvc?rev=832266&view=rev
Log:
CASSANDRA-522
convert replication strategy methods to multimap
patch by jbellis for CASSANDRA-522
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadata.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=832266&r1=832265&r2=832266&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Tue Nov 3 02:23:53 2009
@@ -289,7 +289,7 @@
* This method forces a compaction of the SSTables on disk. We wait
* for the process to complete by waiting on a future pointer.
*/
- List<SSTableReader> forceAntiCompaction(List<Range> ranges, InetAddress
target)
+ List<SSTableReader> forceAntiCompaction(Collection<Range> ranges,
InetAddress target)
{
assert ranges != null;
Future<List<SSTableReader>> futurePtr =
CompactionManager.instance().submit(ColumnFamilyStore.this, ranges, target);
@@ -717,7 +717,7 @@
return maxFile;
}
- List<SSTableReader> doAntiCompaction(List<Range> ranges, InetAddress
target) throws IOException
+ List<SSTableReader> doAntiCompaction(Collection<Range> ranges, InetAddress
target) throws IOException
{
return doFileAntiCompaction(ssTables_.getSSTables(), ranges, target);
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=832266&r1=832265&r2=832266&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
Tue Nov 3 02:23:53 2009
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.List;
+import java.util.Collection;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@@ -74,10 +75,10 @@
static class FileCompactor2 implements Callable<List<SSTableReader>>
{
private ColumnFamilyStore columnFamilyStore_;
- private List<Range> ranges_;
+ private Collection<Range> ranges_;
private InetAddress target_;
- FileCompactor2(ColumnFamilyStore columnFamilyStore, List<Range>
ranges, InetAddress target)
+ FileCompactor2(ColumnFamilyStore columnFamilyStore, Collection<Range>
ranges, InetAddress target)
{
columnFamilyStore_ = columnFamilyStore;
ranges_ = ranges;
@@ -187,7 +188,7 @@
compactor_.submit(new CleanupCompactor(columnFamilyStore));
}
- public Future<List<SSTableReader>> submit(ColumnFamilyStore
columnFamilyStore, List<Range> ranges, InetAddress target)
+ public Future<List<SSTableReader>> submit(ColumnFamilyStore
columnFamilyStore, Collection<Range> ranges, InetAddress target)
{
return compactor_.submit( new FileCompactor2(columnFamilyStore,
ranges, target) );
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=832266&r1=832265&r2=832266&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Tue
Nov 3 02:23:53 2009
@@ -301,7 +301,7 @@
* do a complete compaction since we can figure out based on the ranges
* whether the files need to be split.
*/
- public List<SSTableReader> forceAntiCompaction(List<Range> ranges,
InetAddress target)
+ public List<SSTableReader> forceAntiCompaction(Collection<Range> ranges,
InetAddress target)
{
List<SSTableReader> allResults = new ArrayList<SSTableReader>();
Set<String> columnFamilies = tableMetadata_.getColumnFamilies();
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=832266&r1=832265&r2=832266&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
Tue Nov 3 02:23:53 2009
@@ -18,13 +18,7 @@
package org.apache.cassandra.dht;
- import java.util.ArrayList;
- import java.util.Collections;
- import java.util.HashMap;
- import java.util.HashSet;
- import java.util.List;
- import java.util.Map;
- import java.util.Set;
+ import java.util.*;
import java.util.concurrent.locks.Condition;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
@@ -54,6 +48,9 @@
import org.apache.cassandra.io.SSTableWriter;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Table;
+ import com.google.common.collect.Multimap;
+ import com.google.common.collect.HashMultimap;
+ import com.google.common.collect.ArrayListMultimap;
/**
@@ -95,11 +92,11 @@
{
public void run()
{
- Map<Range, Set<InetAddress>> rangesWithSourceTarget =
getRangesWithSources();
+ Multimap<Range, InetAddress> rangesWithSourceTarget =
getRangesWithSources();
if (logger.isDebugEnabled())
logger.debug("Beginning bootstrap process for " +
address + " ...");
/* Send messages to respective folks to stream data over to me
*/
- for (Map.Entry<InetAddress, List<Range>> entry :
getWorkMap(rangesWithSourceTarget).entrySet())
+ for (Map.Entry<InetAddress, Collection<Range>> entry :
getWorkMap(rangesWithSourceTarget).asMap().entrySet())
{
InetAddress source = entry.getKey();
BootstrapMetadata bsMetadata = new
BootstrapMetadata(address, entry.getValue());
@@ -147,24 +144,24 @@
}
}
- Map<Range, Set<InetAddress>> getRangesWithSources()
+ Multimap<Range, InetAddress> getRangesWithSources()
{
Map<Token, InetAddress> map = tokenMetadata.cloneTokenEndPointMap();
assert map.size() > 0;
map.put(token, address);
- Set<Range> myRanges =
replicationStrategy.getAddressRanges(map).get(address);
+ Collection<Range> myRanges =
replicationStrategy.getAddressRanges(map).get(address);
map.remove(token);
- Map<Range, Set<InetAddress>> myRangeAddresses = new HashMap<Range,
Set<InetAddress>>();
- Map<Range, Set<InetAddress>> rangeAddresses =
replicationStrategy.getRangeAddresses(map);
+ Multimap<Range, InetAddress> myRangeAddresses = HashMultimap.create();
+ Multimap<Range, InetAddress> rangeAddresses =
replicationStrategy.getRangeAddresses(map);
for (Range range : rangeAddresses.keySet())
{
for (Range myRange : myRanges)
{
if (range.contains(myRange.right()))
{
- assert !myRangeAddresses.containsKey(myRange);
- myRangeAddresses.put(myRange, rangeAddresses.get(range));
+ myRangeAddresses.putAll(myRange,
rangeAddresses.get(range));
+ break;
}
}
}
@@ -179,18 +176,18 @@
return btc.getToken();
}
- static Map<InetAddress, List<Range>> getWorkMap(Map<Range,
Set<InetAddress>> rangesWithSourceTarget)
+ static Multimap<InetAddress, Range> getWorkMap(Multimap<Range,
InetAddress> rangesWithSourceTarget)
{
return getWorkMap(rangesWithSourceTarget, FailureDetector.instance());
}
- static Map<InetAddress, List<Range>> getWorkMap(Map<Range,
Set<InetAddress>> rangesWithSourceTarget, IFailureDetector failureDetector)
+ static Multimap<InetAddress, Range> getWorkMap(Multimap<Range,
InetAddress> rangesWithSourceTarget, IFailureDetector failureDetector)
{
/*
* Map whose key is the source node and the value is a map whose key
is the
* target and value is the list of ranges to be sent to it.
*/
- Map<InetAddress, List<Range>> sources = new HashMap<InetAddress,
List<Range>>();
+ Multimap<InetAddress, Range> sources = ArrayListMultimap.create();
// TODO look for contiguous ranges and map them to the same source
for (Range range : rangesWithSourceTarget.keySet())
@@ -199,13 +196,7 @@
{
if (failureDetector.isAlive(source))
{
- List<Range> ranges = sources.get(source);
- if (ranges == null)
- {
- ranges = new ArrayList<Range>();
- sources.put(source, ranges);
- }
- ranges.add(range);
+ sources.put(source, range);
break;
}
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadata.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadata.java?rev=832266&r1=832265&r2=832266&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadata.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadata.java
Tue Nov 3 02:23:53 2009
@@ -23,6 +23,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Collection;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.net.CompactEndPointSerializationHelper;
@@ -51,9 +52,9 @@
}
protected InetAddress target_;
- protected List<Range> ranges_;
+ protected Collection<Range> ranges_;
- BootstrapMetadata(InetAddress target, List<Range> ranges)
+ BootstrapMetadata(InetAddress target, Collection<Range> ranges)
{
target_ = target;
ranges_ = ranges;
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java?rev=832266&r1=832265&r2=832266&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
Tue Nov 3 02:23:53 2009
@@ -22,6 +22,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Collection;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Table;
@@ -93,7 +94,7 @@
* locally for each range and then stream them using
* the Bootstrap protocol to the target endpoint.
*/
- private void doTransfer(InetAddress target, List<Range> ranges) throws
IOException
+ private void doTransfer(InetAddress target, Collection<Range> ranges)
throws IOException
{
if ( ranges.size() == 0 )
{
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=832266&r1=832265&r2=832266&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
Tue Nov 3 02:23:53 2009
@@ -23,8 +23,8 @@
import org.apache.log4j.Logger;
-import org.apache.commons.lang.StringUtils;
-
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
@@ -33,7 +33,6 @@
import org.apache.cassandra.service.InvalidRequestException;
import org.apache.cassandra.service.QuorumResponseHandler;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.config.DatabaseDescriptor;
/**
* This class contains a helper method that will be used by
@@ -177,46 +176,39 @@
// TODO this is pretty inefficient. also the inverse (getRangeAddresses)
below.
// fixing this probably requires merging tokenmetadata into
replicationstrategy, so we can cache/invalidate cleanly
- public Map<InetAddress, Set<Range>> getAddressRanges(Map<Token,
InetAddress> tokenMap)
+ public Multimap<InetAddress, Range> getAddressRanges(Map<Token,
InetAddress> metadata)
{
- Map<InetAddress, Set<Range>> map = new HashMap<InetAddress,
Set<Range>>();
-
- for (InetAddress ep : tokenMap.values())
- {
- map.put(ep, new HashSet<Range>());
- }
+ Multimap<InetAddress, Range> map = HashMultimap.create();
- for (Token token : tokenMap.keySet())
+ for (Token token : metadata.keySet())
{
- Range range = getPrimaryRangeFor(token, tokenMap);
- for (InetAddress ep : getNaturalEndpoints(token, tokenMap))
+ Range range = getPrimaryRangeFor(token, metadata);
+ for (InetAddress ep : getNaturalEndpoints(token, metadata))
{
- map.get(ep).add(range);
+ map.put(ep, range);
}
}
return map;
}
- public Map<Range, Set<InetAddress>> getRangeAddresses(Map<Token,
InetAddress> tokenMap)
+ public Multimap<Range, InetAddress> getRangeAddresses(Map<Token,
InetAddress> metadata)
{
- Map<Range, Set<InetAddress>> map = new HashMap<Range,
Set<InetAddress>>();
+ Multimap<Range, InetAddress> map = HashMultimap.create();
- for (Token token : tokenMap.keySet())
+ for (Token token : metadata.keySet())
{
- Range range = getPrimaryRangeFor(token, tokenMap);
- HashSet<InetAddress> addresses = new HashSet<InetAddress>();
- for (InetAddress ep : getNaturalEndpoints(token, tokenMap))
+ Range range = getPrimaryRangeFor(token, metadata);
+ for (InetAddress ep : getNaturalEndpoints(token, metadata))
{
- addresses.add(ep);
+ map.put(range, ep);
}
- map.put(range, addresses);
}
return map;
}
- public Map<InetAddress, Set<Range>> getAddressRanges()
+ public Multimap<InetAddress, Range> getAddressRanges()
{
return getAddressRanges(tokenMetadata_.cloneTokenEndPointMap());
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=832266&r1=832265&r2=832266&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Tue Nov 3 02:23:53 2009
@@ -84,7 +84,7 @@
return partitioner_;
}
- public Set<Range> getLocalRanges()
+ public Collection<Range> getLocalRanges()
{
return getRangesForEndPoint(FBUtilities.getLocalAddress());
}
@@ -672,7 +672,7 @@
* @param ep endpoint we are interested in.
* @return ranges for the specified endpoint.
*/
- Set<Range> getRangesForEndPoint(InetAddress ep)
+ Collection<Range> getRangesForEndPoint(InetAddress ep)
{
return replicationStrategy_.getAddressRanges().get(ep);
}
Modified:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java?rev=832266&r1=832265&r2=832266&view=diff
==============================================================================
---
incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
(original)
+++
incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
Tue Nov 3 02:23:53 2009
@@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.Collection;
import org.apache.commons.lang.StringUtils;
import static org.junit.Assert.assertEquals;
@@ -32,6 +33,7 @@
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.gms.IFailureDetector;
import org.apache.cassandra.gms.IFailureDetectionEventListener;
+import com.google.common.collect.Multimap;
public class BootStrapperTest {
@Test
@@ -54,10 +56,10 @@
TokenMetadata tmd = ss.getTokenMetadata();
assertEquals(numOldNodes, tmd.cloneTokenEndPointMap().size());
BootStrapper b = new BootStrapper(ss.getReplicationStrategy(),
myEndpoint, myToken, tmd);
- Map<Range, Set<InetAddress>> res = b.getRangesWithSources();
+ Multimap<Range, InetAddress> res = b.getRangesWithSources();
int transferCount = 0;
- for (Map.Entry<Range, Set<InetAddress>> e : res.entrySet())
+ for (Map.Entry<Range, Collection<InetAddress>> e :
res.asMap().entrySet())
{
assert e.getValue() != null && e.getValue().size() > 0 :
StringUtils.join(e.getValue(), ", ");
transferCount++;
@@ -77,9 +79,9 @@
public void
registerFailureDetectionEventListener(IFailureDetectionEventListener listener)
{ throw new UnsupportedOperationException(); }
public void
unregisterFailureDetectionEventListener(IFailureDetectionEventListener
listener) { throw new UnsupportedOperationException(); }
};
- Map<InetAddress, List<Range>> temp = BootStrapper.getWorkMap(res,
mockFailureDetector);
+ Multimap<InetAddress, Range> temp = BootStrapper.getWorkMap(res,
mockFailureDetector);
assertEquals(1, temp.keySet().size());
- assertEquals(1, temp.values().iterator().next().size());
+ assertEquals(1, temp.asMap().values().iterator().next().size());
assert !temp.keySet().iterator().next().equals(myEndpoint);
}