Author: jbellis
Date: Mon Dec 7 22:51:46 2009
New Revision: 888171
URL: http://svn.apache.org/viewvc?rev=888171&view=rev
Log:
implement streaming repairs; repair-via-rangecommand TODO
patch by Stu Hood; reviewed by jbellis for CASSANDRA-520
Removed:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/net/CompactEndPointSerializationHelperTest.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/MerkleTree.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableTest.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableUtils.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=888171&r1=888170&r2=888171&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
Mon Dec 7 22:51:46 2009
@@ -27,9 +27,9 @@
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.CompactionManager;
+import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Table;
-import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.CompactionIterator.CompactedRow;
@@ -37,7 +37,7 @@
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.io.SSTable;
import org.apache.cassandra.io.SSTableReader;
-import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.io.Streaming;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
@@ -47,7 +47,6 @@
import org.apache.cassandra.utils.MerkleTree;
import org.apache.log4j.Logger;
-import org.apache.commons.lang.StringUtils;
import com.google.common.collect.Collections2;
import com.google.common.base.Predicate;
@@ -158,11 +157,12 @@
if (etrees == null)
{
// double check the creation
- Cachetable<CFTuple, MerkleTree> probable =
- new Cachetable<CFTuple, MerkleTree>(TREE_CACHE_LIFETIME);
+ Cachetable<CFTuple, MerkleTree> probable = new Cachetable<CFTuple,
MerkleTree>(TREE_CACHE_LIFETIME);
if ((etrees = trees.putIfAbsent(endpoint, probable)) == null)
+ {
// created new store for this endpoint
etrees = probable;
+ }
}
return etrees;
}
@@ -189,12 +189,16 @@
for (Map.Entry<InetAddress, Cachetable<CFTuple, MerkleTree>> entry
: trees.entrySet())
{
if (LOCAL.equals(entry.getKey()))
+ {
// don't compare to ourself
continue;
+ }
MerkleTree remotetree = entry.getValue().remove(cf);
if (remotetree == null)
+ {
// no tree stored for this endpoint at the moment
continue;
+ }
differencers.add(new Differencer(cf, LOCAL, entry.getKey(),
tree, remotetree));
}
@@ -206,8 +210,10 @@
// we stored a remote tree: queue differencing for local tree
MerkleTree localtree = cacheForEndpoint(LOCAL).get(cf);
if (localtree != null)
+ {
// compare immediately
differencers.add(new Differencer(cf, LOCAL, endpoint,
localtree, tree));
+ }
else
{
// cache for later comparison
@@ -317,12 +323,11 @@
Validator(CFTuple cf, InetAddress initiator)
{
- this(cf, initiator,
+ this(cf,
+ initiator,
// TODO: memory usage (maxsize) should either be tunable per
// CF, globally, or as shared for all CFs in a cluster
- new MerkleTree(DatabaseDescriptor.getPartitioner(),
- MerkleTree.RECOMMENDED_DEPTH,
- (int)Math.pow(2,15)));
+ new MerkleTree(DatabaseDescriptor.getPartitioner(),
MerkleTree.RECOMMENDED_DEPTH, (int)Math.pow(2, 15)));
}
Validator(CFTuple cf, InetAddress initiator, MerkleTree tree)
@@ -340,17 +345,20 @@
public void prepare()
{
- Predicate<SSTable> cfpred = new Predicate<SSTable>(){
+ Predicate<SSTable> cfpred = new Predicate<SSTable>()
+ {
public boolean apply(SSTable ss)
{
return cf.table.equals(ss.getTableName()) &&
cf.cf.equals(ss.getColumnFamilyName());
}
- };
+ };
List<DecoratedKey> keys =
SSTableReader.getIndexedDecoratedKeysFor(cfpred, DKPRED);
if (keys.isEmpty())
+ {
// use an even tree distribution
tree.init();
+ }
else
{
int numkeys = keys.size();
@@ -428,8 +436,7 @@
private MerkleTree.RowHash rowHash(CompactedRow row)
{
- byte[] rowhash = FBUtilities.hash("MD5", row.key.key.getBytes(),
- row.buffer.getData());
+ byte[] rowhash = FBUtilities.hash("MD5", row.key.key.getBytes(),
row.buffer.getData());
return new MerkleTree.RowHash(row.key.token, rowhash);
}
@@ -445,8 +452,7 @@
while (ranges.hasNext())
{
MerkleTree.TreeRange range = ranges.next();
- if (!ranges.hasNext() && !minrows.isEmpty() &&
- range.contains(tree.partitioner().getMinimumToken()))
+ if (!ranges.hasNext() && !minrows.isEmpty() &&
range.contains(tree.partitioner().getMinimumToken()))
{
// append rows with the minimum token into the last range
rows.addAll(minrows);
@@ -474,16 +480,17 @@
InetAddress local = FBUtilities.getLocalAddress();
StorageService ss = StorageService.instance();
- Collection<InetAddress> neighbors =
- Collections2.filter(ss.getNaturalEndpoints(ss.getLocalToken()),
- Predicates.not(Predicates.equalTo(local)));
+ Collection<InetAddress> neighbors =
Collections2.filter(ss.getNaturalEndpoints(ss.getLocalToken()),
+
Predicates.not(Predicates.equalTo(local)));
// cache the local tree
aes.register(cf, local, tree);
if (!local.equals(initiator))
+ {
// one of our neighbors initiated: broadcast the tree to all
of them
aes.notifyNeighbors(this, local, neighbors);
+ }
// else: we initiated this validation session: wait for responses
// return any old object
@@ -532,7 +539,7 @@
public final InetAddress remote;
public final MerkleTree ltree;
public final MerkleTree rtree;
- public final List<Range> differences;
+ public final List<MerkleTree.TreeRange> differences;
public Differencer(CFTuple cf, InetAddress local, InetAddress remote,
MerkleTree ltree, MerkleTree rtree)
{
@@ -541,7 +548,7 @@
this.remote = remote;
this.ltree = ltree;
this.rtree = rtree;
- differences = new ArrayList<Range>();
+ differences = new ArrayList<MerkleTree.TreeRange>();
}
/**
@@ -563,7 +570,7 @@
interesting.retainAll(ss.getRangesForEndPoint(remote));
// compare trees, and filter out uninteresting differences
- for (Range diff : MerkleTree.difference(ltree, rtree))
+ for (MerkleTree.TreeRange diff : MerkleTree.difference(ltree,
rtree))
{
for (Range localrange: interesting)
{
@@ -575,13 +582,71 @@
}
}
- // TODO: calculating a percentage here would be all kinds of
awesome
- logger.info("Found " + differences.size() + " differing ranges
between local " +
- local + " and remote " + remote + " endpoints for " + cf +
".");
+ // choose a repair method based on the significance of the
difference
+ float difference = differenceFraction();
+ try
+ {
+ if (difference == 0.0)
+ {
+ logger.debug("Endpoints " + local + " and " + remote + "
are consistent for " + cf);
+ return;
+ }
- // FIXME: trigger repairs!
+ if (difference < 0.05)
+ performRangeRepair();
+ else
+ performStreamingRepair();
+ }
+ catch(IOException e)
+ {
+ throw new RuntimeException(e);
+ }
}
+ /**
+ * @return the fraction of the keyspace that is different, as
represented by our
+ * list of different ranges. A range at depth 0 == 1.0, at depth 1 ==
0.5, etc.
+ */
+ float differenceFraction()
+ {
+ double fraction = 0.0;
+ for (MerkleTree.TreeRange diff : differences)
+ fraction += 1.0 / Math.pow(2, diff.depth);
+ return (float)fraction;
+ }
+
+ /**
+ * Sends our list of differences to the remote endpoint using read
+ * repairs via the query API.
+ */
+ void performRangeRepair() throws IOException
+ {
+ logger.info("Performing range read repair of " +
differences.size() + " ranges for " + cf);
+ // FIXME
+ logger.debug("Finished range read repair for " + cf);
+ }
+
+ /**
+ * Sends our list of differences to the remote endpoint using the
+ * Streaming API.
+ */
+ void performStreamingRepair() throws IOException
+ {
+ logger.info("Performing streaming repair of " + differences.size()
+ " ranges to " + remote + " for " + cf);
+ ColumnFamilyStore cfstore =
Table.open(cf.table).getColumnFamilyStore(cf.cf);
+ try
+ {
+ List<Range> ranges = new ArrayList<Range>(differences);
+ List<SSTableReader> sstables =
CompactionManager.instance().submitAnti(cfstore, ranges, remote).get();
+ Streaming.transferSSTables(remote, sstables, cf.table);
+ }
+ catch(Exception e)
+ {
+ throw new IOException("Streaming repair failed.", e);
+ }
+ logger.debug("Finished streaming repair to " + remote + " for " +
cf);
+ }
+
public String toString()
{
return "#<Differencer " + cf + " local=" + local + " remote=" +
remote + ">";
@@ -604,8 +669,7 @@
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
SERIALIZER.serialize(new CFTuple(table, cf), dos);
- return new Message(FBUtilities.getLocalAddress(),
AE_SERVICE_STAGE,
- TREE_REQUEST_VERB, bos.toByteArray());
+ return new Message(FBUtilities.getLocalAddress(),
AE_SERVICE_STAGE, TREE_REQUEST_VERB, bos.toByteArray());
}
catch(IOException e)
{
@@ -641,31 +705,27 @@
// check for cached local tree
InetAddress local = FBUtilities.getLocalAddress();
- MerkleTree cached =
- AntiEntropyService.instance().getCachedTree(request.table,
- request.cf,
- local);
+ MerkleTree cached =
AntiEntropyService.instance().getCachedTree(request.table, request.cf, local);
if (cached != null)
{
if (local.equals(message.getFrom()))
+ {
// we are the requestor, and we already have a cached
tree
return;
+ }
// respond immediately with the recently generated tree
Validator valid = new Validator(request,
message.getFrom(), cached);
Message response = TreeResponseVerbHandler.makeVerb(local,
valid);
MessagingService.instance().sendOneWay(response,
message.getFrom());
- logger.debug("Answered request from " + message.getFrom() +
- " for " + request + " with cached tree.");
+ logger.debug("Answered request from " + message.getFrom()
+ " for " + request + " with cached tree.");
return;
}
// trigger readonly-compaction
- logger.debug("Queueing readonly compaction for request from " +
- message.getFrom() + " for " + request);
+ logger.debug("Queueing readonly compaction for request from "
+ message.getFrom() + " for " + request);
Table table = Table.open(request.table);
-
CompactionManager.instance().submitReadonly(table.getColumnFamilyStore(request.cf),
- message.getFrom());
- }
+
CompactionManager.instance().submitReadonly(table.getColumnFamilyStore(request.cf),
message.getFrom());
+ }
catch (Exception e)
{
logger.warn(LogUtil.throwableToString(e));
@@ -711,8 +771,7 @@
ObjectInputStream ois = new ObjectInputStream(dis);
try
{
- Validator v = new Validator(cf, (InetAddress)ois.readObject(),
- (MerkleTree)ois.readObject());
+ Validator v = new Validator(cf, (InetAddress)ois.readObject(),
(MerkleTree)ois.readObject());
return v;
}
catch(Exception e)
@@ -731,9 +790,8 @@
{
// deserialize the remote tree, and register it
Validator rvalidator = this.deserialize(buffer);
- AntiEntropyService.instance().register(rvalidator.cf,
message.getFrom(),
- rvalidator.tree);
- }
+ AntiEntropyService.instance().register(rvalidator.cf,
message.getFrom(), rvalidator.tree);
+ }
catch (Exception e)
{
logger.warn(LogUtil.throwableToString(e));
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java?rev=888171&r1=888170&r2=888171&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java
Mon Dec 7 22:51:46 2009
@@ -20,7 +20,10 @@
import java.io.File;
import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.net.InetAddress;
@@ -40,15 +43,16 @@
{
private static Logger logger_ = Logger.getLogger( StreamManager.class );
- private static Map<InetAddress, StreamManager> streamManagers_ = new
HashMap<InetAddress, StreamManager>();
+ private static ConcurrentMap<InetAddress, StreamManager> streamManagers_ =
new ConcurrentHashMap<InetAddress, StreamManager>();
public static StreamManager instance(InetAddress to)
{
StreamManager streamManager = streamManagers_.get(to);
if ( streamManager == null )
{
- streamManager = new StreamManager(to);
- streamManagers_.put(to, streamManager);
+ StreamManager possibleNew = new StreamManager(to);
+ if ((streamManager = streamManagers_.putIfAbsent(to, possibleNew))
== null)
+ streamManager = possibleNew;
}
return streamManager;
}
@@ -79,7 +83,7 @@
{
File file = filesToStream_.get(0);
if (logger_.isDebugEnabled())
- logger_.debug("Streaming file " + file + " ...");
+ logger_.debug("Streaming " + file.length() + " length file " +
file + " ...");
MessagingService.instance().stream(file.getAbsolutePath(), 0L,
file.length(), FBUtilities.getLocalAddress(), to_);
}
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/MerkleTree.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/MerkleTree.java?rev=888171&r1=888170&r2=888171&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/MerkleTree.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/MerkleTree.java
Mon Dec 7 22:51:46 2009
@@ -168,11 +168,11 @@
* @param active Only ranges that intersect this range will be returned.
* @return A list of the largest contiguous ranges where the given trees
disagree.
*/
- public static List<Range> difference(MerkleTree ltree, MerkleTree rtree)
+ public static List<TreeRange> difference(MerkleTree ltree, MerkleTree
rtree)
{
- List<Range> diff = new ArrayList<Range>();
+ List<TreeRange> diff = new ArrayList<TreeRange>();
Token mintoken = ltree.partitioner.getMinimumToken();
- Range active = new Range(mintoken, mintoken);
+ TreeRange active = new TreeRange(null, mintoken, mintoken, (byte)0,
null);
byte[] lhash = ltree.hash(active);
byte[] rhash = rtree.hash(active);
@@ -194,11 +194,11 @@
* Takes two trees and a range for which they have hashes, but are
inconsistent.
* @return FULLY_INCONSISTENT if active is inconsistent,
PARTIALLY_INCONSISTENT if only a subrange is inconsistent.
*/
- static int differenceHelper(MerkleTree ltree, MerkleTree rtree,
List<Range> diff, Range active)
+ static int differenceHelper(MerkleTree ltree, MerkleTree rtree,
List<TreeRange> diff, TreeRange active)
{
Token midpoint = ltree.partitioner().midpoint(active.left(),
active.right());
- Range left = new Range(active.left(), midpoint);
- Range right = new Range(midpoint, active.right());
+ TreeRange left = new TreeRange(null, active.left(), midpoint,
inc(active.depth), null);
+ TreeRange right = new TreeRange(null, midpoint, active.right(),
inc(active.depth), null);
byte[] lhash;
byte[] rhash;
@@ -471,7 +471,8 @@
*
* NB: A TreeRange should not be returned by a public method unless the
* parents of the range it represents are already invalidated, since it
- * will allow someone to modify the hash.
+ * will allow someone to modify the hash. Alternatively, a TreeRange
+ * may be created with a null tree, indicating that it is read only.
*/
public static class TreeRange extends Range
{
@@ -489,6 +490,7 @@
public void hash(byte[] hash)
{
+ assert tree != null : "Not intended for modification!";
hashable.hash(hash);
}
@@ -512,6 +514,7 @@
*/
public void validate(PeekingIterator<RowHash> entries)
{
+ assert tree != null : "Not intended for modification!";
assert hashable instanceof Leaf;
byte[] roothash;
try
@@ -587,7 +590,7 @@
{
StringBuilder buff = new StringBuilder("#<TreeRange ");
buff.append(super.toString()).append(" depth=").append(depth);
- return buff.append("
hash=").append(hashable.hash()).append(">").toString();
+ return buff.append(">").toString();
}
}
Modified:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableTest.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableTest.java?rev=888171&r1=888170&r2=888171&view=diff
==============================================================================
---
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableTest.java
(original)
+++
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableTest.java
Mon Dec 7 22:51:46 2009
@@ -43,8 +43,7 @@
TreeMap<String, byte[]> map = new TreeMap<String,byte[]>();
map.put(key, bytes);
- SSTableReader ssTable = SSTableUtils.writeSSTable("singlewrite", map,
1,
- new
OrderPreservingPartitioner(), 0.01);
+ SSTableReader ssTable = SSTableUtils.writeRawSSTable("table",
"singlewrite", map);
// verify
verifySingle(ssTable, bytes, key);
@@ -72,8 +71,7 @@
}
// write
- SSTableReader ssTable = SSTableUtils.writeSSTable("manywrites", map,
1000,
- new
OrderPreservingPartitioner(), 0.01);
+ SSTableReader ssTable = SSTableUtils.writeRawSSTable("table",
"manywrites", map);
// verify
verifyMany(ssTable, map);
@@ -109,9 +107,7 @@
}
// write
- SSTableReader ssTable = SSTableUtils.writeSSTable(ssname, map, 1000,
- new
OrderPreservingPartitioner(), 0.01);
-
+ SSTableReader ssTable = SSTableUtils.writeRawSSTable("table", ssname,
map);
// verify
Predicate<SSTable> cfpred;
Modified:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableUtils.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableUtils.java?rev=888171&r1=888170&r2=888171&view=diff
==============================================================================
---
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableUtils.java
(original)
+++
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableUtils.java
Mon Dec 7 22:51:46 2009
@@ -23,24 +23,85 @@
import java.util.Map;
import java.util.SortedMap;
+import java.util.Set;
+import java.util.TreeMap;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Column;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.Table;
import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.service.StorageService;
+/**
+ * TODO: These methods imitate Memtable.writeSortedKeys to some degree, but
+ * because it is so monolithic, we can't reuse much.
+ */
public class SSTableUtils
{
- public static File tempSSTableFileName(String cfname) throws IOException
+ // first configured table and cf
+ public static String TABLENAME;
+ public static String CFNAME;
+ static
{
- return File.createTempFile(cfname + "-", "-" + SSTable.TEMPFILE_MARKER
+ "-Data.db");
+ try
+ {
+ TABLENAME = DatabaseDescriptor.getTables().get(0);
+ CFNAME =
Table.open(TABLENAME).getColumnFamilies().iterator().next();
+ }
+ catch(IOException e)
+ {
+ throw new RuntimeException(e);
+ }
}
- public static SSTableReader writeSSTable(String cfname, SortedMap<String,
byte[]> entries, int expectedKeys, IPartitioner partitioner, double
cacheFraction) throws IOException
+ public static File tempSSTableFile(String tablename, String cfname) throws
IOException
{
- File f = tempSSTableFileName(cfname);
- SSTableWriter writer = new SSTableWriter(f.getAbsolutePath(),
expectedKeys, partitioner);
- for (Map.Entry<String, byte[]> entry : entries.entrySet())
+ File tempdir = File.createTempFile(tablename, cfname);
+ if(!tempdir.delete() || !tempdir.mkdir())
+ throw new IOException("Temporary directory creation failed.");
+ tempdir.deleteOnExit();
+ File tabledir = new File(tempdir, tablename);
+ tabledir.mkdir();
+ tabledir.deleteOnExit();
+ return File.createTempFile(cfname + "-",
+ "-" + SSTable.TEMPFILE_MARKER + "-Data.db",
+ tabledir);
+ }
+
+ public static SSTableReader writeSSTable(Set<String> keys) throws
IOException
+ {
+ TreeMap<String, ColumnFamily> map = new TreeMap<String,
ColumnFamily>();
+ for (String key : keys)
{
- writer.append(writer.partitioner.decorateKey(entry.getKey()),
entry.getValue());
+ ColumnFamily cf = ColumnFamily.create(TABLENAME, CFNAME);
+ cf.addColumn(new Column(key.getBytes(), key.getBytes(), 0));
+ map.put(key, cf);
}
- return writer.closeAndOpenReader(cacheFraction);
+ return writeSSTable(map);
+ }
+
+ public static SSTableReader writeSSTable(SortedMap<String, ColumnFamily>
entries) throws IOException
+ {
+ TreeMap<String, byte[]> map = new TreeMap<String, byte[]>();
+ for (Map.Entry<String, ColumnFamily> entry : entries.entrySet())
+ {
+ DataOutputBuffer buffer = new DataOutputBuffer();
+ ColumnFamily.serializer().serializeWithIndexes(entry.getValue(),
buffer);
+ map.put(entry.getKey(), buffer.getData());
+ }
+ return writeRawSSTable(TABLENAME, CFNAME, map);
+ }
+
+ public static SSTableReader writeRawSSTable(String tablename, String
cfname, SortedMap<String, byte[]> entries) throws IOException
+ {
+ File f = tempSSTableFile(tablename, cfname);
+ SSTableWriter writer = new SSTableWriter(f.getAbsolutePath(),
entries.size(), StorageService.getPartitioner());
+ for (Map.Entry<String, byte[]> entry : entries.entrySet())
+ writer.append(writer.partitioner.decorateKey(entry.getKey()),
+ entry.getValue());
+ new File(writer.indexFilename()).deleteOnExit();
+ new File(writer.filterFilename()).deleteOnExit();
+ return writer.closeAndOpenReader(1.0);
}
}
Modified:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java?rev=888171&r1=888170&r2=888171&view=diff
==============================================================================
---
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java
(original)
+++
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java
Mon Dec 7 22:51:46 2009
@@ -43,7 +43,7 @@
@Test
public void testTransferTable() throws Exception
{
- StorageService.instance().start();
+ StorageService.instance().initServer();
// write a temporary SSTable, but don't register it
Set<String> content = new HashSet<String>();
Modified:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java?rev=888171&r1=888170&r2=888171&view=diff
==============================================================================
---
incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
(original)
+++
incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
Mon Dec 7 22:51:46 2009
@@ -91,18 +91,13 @@
{
Validator validator;
- // open an SSTable to give us something to sample
- TreeMap<String, byte[]> map = new TreeMap<String,byte[]>();
- for ( int i = 0; i < 1000; i++ )
- {
- map.put(Integer.toString(i), "blah".getBytes());
- }
-
// write
- SSTableReader ssTable =
- SSTableUtils.writeSSTable(cfname, map, 1000,
-
StorageService.instance().getPartitioner(), 0.01);
- tablename = ssTable.getTableName();
+ List<RowMutation> rms = new LinkedList<RowMutation>();
+ RowMutation rm;
+ rm = new RowMutation(tablename, "key1");
+ rm.add(new QueryPath(cfname, null, "Column1".getBytes()),
"asdf".getBytes(), 0);
+ rms.add(rm);
+ ColumnFamilyStoreUtils.writeColumnFamily(rms);
// sample
validator = new Validator(new CFTuple(tablename, cfname), LOCAL);
Modified:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java?rev=888171&r1=888170&r2=888171&view=diff
==============================================================================
---
incubator/cassandra/trunk/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java
(original)
+++
incubator/cassandra/trunk/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java
Mon Dec 7 22:51:46 2009
@@ -527,7 +527,7 @@
range.validate(new HIterator(range.right()));
// trees should disagree for leftmost, (middle.left, rightmost.right]
- List<Range> diffs = MerkleTree.difference(mt, mt2);
+ List<TreeRange> diffs = MerkleTree.difference(mt, mt2);
assertEquals(diffs + " contains wrong number of differences:", 2,
diffs.size());
assertTrue(diffs.contains(leftmost));
assertTrue(diffs.contains(new Range(middle.left(),
rightmost.right())));