Author: jbellis
Date: Mon Dec  7 22:51:46 2009
New Revision: 888171

URL: http://svn.apache.org/viewvc?rev=888171&view=rev
Log:
implement streaming repairs; repair-via-rangecommand TODO
patch by Stu Hood; reviewed by jbellis for CASSANDRA-520

Removed:
    
incubator/cassandra/trunk/test/unit/org/apache/cassandra/net/CompactEndPointSerializationHelperTest.java
Modified:
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/MerkleTree.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableTest.java
    
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableUtils.java
    
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java
    
incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
    
incubator/cassandra/trunk/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=888171&r1=888170&r2=888171&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
 Mon Dec  7 22:51:46 2009
@@ -27,9 +27,9 @@
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.CompactionManager;
+import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Table;
-import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.CompactionIterator.CompactedRow;
@@ -37,7 +37,7 @@
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.io.SSTable;
 import org.apache.cassandra.io.SSTableReader;
-import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.io.Streaming;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
@@ -47,7 +47,6 @@
 import org.apache.cassandra.utils.MerkleTree;
 
 import org.apache.log4j.Logger;
-import org.apache.commons.lang.StringUtils;
 
 import com.google.common.collect.Collections2;
 import com.google.common.base.Predicate;
@@ -158,11 +157,12 @@
         if (etrees == null)
         {
             // double check the creation
-            Cachetable<CFTuple, MerkleTree> probable =
-                new Cachetable<CFTuple, MerkleTree>(TREE_CACHE_LIFETIME);
+            Cachetable<CFTuple, MerkleTree> probable = new Cachetable<CFTuple, 
MerkleTree>(TREE_CACHE_LIFETIME);
             if ((etrees = trees.putIfAbsent(endpoint, probable)) == null)
+            {
                 // created new store for this endpoint
                 etrees = probable;
+            }
         }
         return etrees;
     }
@@ -189,12 +189,16 @@
             for (Map.Entry<InetAddress, Cachetable<CFTuple, MerkleTree>> entry 
: trees.entrySet())
             {
                 if (LOCAL.equals(entry.getKey()))
+                {
                     // don't compare to ourself
                     continue;
+                }
                 MerkleTree remotetree = entry.getValue().remove(cf);
                 if (remotetree == null)
+                {
                     // no tree stored for this endpoint at the moment
                     continue;
+                }
 
                 differencers.add(new Differencer(cf, LOCAL, entry.getKey(), 
tree, remotetree));
             }
@@ -206,8 +210,10 @@
             // we stored a remote tree: queue differencing for local tree
             MerkleTree localtree = cacheForEndpoint(LOCAL).get(cf);
             if (localtree != null)
+            {
                 // compare immediately
                 differencers.add(new Differencer(cf, LOCAL, endpoint, 
localtree, tree));
+            }
             else
             {
                 // cache for later comparison
@@ -317,12 +323,11 @@
         
         Validator(CFTuple cf, InetAddress initiator)
         {
-            this(cf, initiator,
+            this(cf,
+                 initiator,
                  // TODO: memory usage (maxsize) should either be tunable per
                  // CF, globally, or as shared for all CFs in a cluster
-                 new MerkleTree(DatabaseDescriptor.getPartitioner(),
-                                MerkleTree.RECOMMENDED_DEPTH,
-                                (int)Math.pow(2,15)));
+                 new MerkleTree(DatabaseDescriptor.getPartitioner(), 
MerkleTree.RECOMMENDED_DEPTH, (int)Math.pow(2, 15)));
         }
 
         Validator(CFTuple cf, InetAddress initiator, MerkleTree tree)
@@ -340,17 +345,20 @@
         
         public void prepare()
         {
-            Predicate<SSTable> cfpred = new Predicate<SSTable>(){
+            Predicate<SSTable> cfpred = new Predicate<SSTable>()
+            {
                 public boolean apply(SSTable ss)
                 {
                     return cf.table.equals(ss.getTableName()) && 
cf.cf.equals(ss.getColumnFamilyName());
                 }
-                };
+            };
             List<DecoratedKey> keys = 
SSTableReader.getIndexedDecoratedKeysFor(cfpred, DKPRED);
 
             if (keys.isEmpty())
+            {
                 // use an even tree distribution
                 tree.init();
+            }
             else
             {
                 int numkeys = keys.size();
@@ -428,8 +436,7 @@
 
         private MerkleTree.RowHash rowHash(CompactedRow row)
         {
-            byte[] rowhash = FBUtilities.hash("MD5", row.key.key.getBytes(),
-                                                     row.buffer.getData());
+            byte[] rowhash = FBUtilities.hash("MD5", row.key.key.getBytes(), 
row.buffer.getData());
             return new MerkleTree.RowHash(row.key.token, rowhash);
         }
 
@@ -445,8 +452,7 @@
             while (ranges.hasNext())
             {
                 MerkleTree.TreeRange range = ranges.next();
-                if (!ranges.hasNext() && !minrows.isEmpty() &&
-                        range.contains(tree.partitioner().getMinimumToken()))
+                if (!ranges.hasNext() && !minrows.isEmpty() && 
range.contains(tree.partitioner().getMinimumToken()))
                 {
                     // append rows with the minimum token into the last range
                     rows.addAll(minrows);
@@ -474,16 +480,17 @@
             InetAddress local = FBUtilities.getLocalAddress();
             StorageService ss = StorageService.instance();
 
-            Collection<InetAddress> neighbors =
-                Collections2.filter(ss.getNaturalEndpoints(ss.getLocalToken()),
-                                    Predicates.not(Predicates.equalTo(local)));
+            Collection<InetAddress> neighbors = 
Collections2.filter(ss.getNaturalEndpoints(ss.getLocalToken()),
+                                                                    
Predicates.not(Predicates.equalTo(local)));
 
             // cache the local tree
             aes.register(cf, local, tree);
 
             if (!local.equals(initiator))
+            {
                 // one of our neighbors initiated: broadcast the tree to all 
of them
                 aes.notifyNeighbors(this, local, neighbors);
+            }
             // else: we initiated this validation session: wait for responses
 
             // return any old object
@@ -532,7 +539,7 @@
         public final InetAddress remote;
         public final MerkleTree ltree;
         public final MerkleTree rtree;
-        public final List<Range> differences;
+        public final List<MerkleTree.TreeRange> differences;
 
         public Differencer(CFTuple cf, InetAddress local, InetAddress remote, 
MerkleTree ltree, MerkleTree rtree)
         {
@@ -541,7 +548,7 @@
             this.remote = remote;
             this.ltree = ltree;
             this.rtree = rtree;
-            differences = new ArrayList<Range>();
+            differences = new ArrayList<MerkleTree.TreeRange>();
         }
 
         /**
@@ -563,7 +570,7 @@
             interesting.retainAll(ss.getRangesForEndPoint(remote));
 
             // compare trees, and filter out uninteresting differences
-            for (Range diff : MerkleTree.difference(ltree, rtree))
+            for (MerkleTree.TreeRange diff : MerkleTree.difference(ltree, 
rtree))
             {
                 for (Range localrange: interesting)
                 {
@@ -575,13 +582,71 @@
                 }
             }
             
-            // TODO: calculating a percentage here would be all kinds of 
awesome
-            logger.info("Found " + differences.size() + " differing ranges 
between local " +
-                local + " and remote " + remote + " endpoints for " + cf + 
".");
+            // choose a repair method based on the significance of the 
difference
+            float difference = differenceFraction();
+            try
+            {
+                if (difference == 0.0)
+                {
+                    logger.debug("Endpoints " + local + " and " + remote + " 
are consistent for " + cf);
+                    return;
+                }
 
-            // FIXME: trigger repairs!
+                if (difference < 0.05)
+                    performRangeRepair();
+                else
+                    performStreamingRepair();
+            }
+            catch(IOException e)
+            {
+                throw new RuntimeException(e);
+            }
         }
         
+        /**
+         * @return the fraction of the keyspace that is different, as 
represented by our
+         * list of different ranges. A range at depth 0 == 1.0, at depth 1 == 
0.5, etc.
+         */
+        float differenceFraction()
+        {
+            double fraction = 0.0;
+            for (MerkleTree.TreeRange diff : differences)
+                fraction += 1.0 / Math.pow(2, diff.depth);
+            return (float)fraction;
+        }
+
+        /**
+         * Sends our list of differences to the remote endpoint using read
+         * repairs via the query API.
+         */
+        void performRangeRepair() throws IOException
+        {
+            logger.info("Performing range read repair of " + 
differences.size() + " ranges for " + cf);
+            // FIXME
+            logger.debug("Finished range read repair for " + cf);
+        }
+
+        /**
+         * Sends our list of differences to the remote endpoint using the
+         * Streaming API.
+         */
+        void performStreamingRepair() throws IOException
+        {
+            logger.info("Performing streaming repair of " + differences.size() 
+ " ranges to " + remote + " for " + cf);
+            ColumnFamilyStore cfstore = 
Table.open(cf.table).getColumnFamilyStore(cf.cf);
+            try
+            {
+                List<Range> ranges = new ArrayList<Range>(differences);
+                List<SSTableReader> sstables = 
CompactionManager.instance().submitAnti(cfstore, ranges, remote).get();
+                Streaming.transferSSTables(remote, sstables, cf.table);
+            }
+            catch(Exception e)
+            {
+                throw new IOException("Streaming repair failed.", e);
+            }
+            logger.debug("Finished streaming repair to " + remote + " for " + 
cf);
+        }
+
         public String toString()
         {
             return "#<Differencer " + cf + " local=" + local + " remote=" + 
remote + ">";
@@ -604,8 +669,7 @@
                 ByteArrayOutputStream bos = new ByteArrayOutputStream();
                 DataOutputStream dos = new DataOutputStream(bos);
                 SERIALIZER.serialize(new CFTuple(table, cf), dos);
-                return new Message(FBUtilities.getLocalAddress(), 
AE_SERVICE_STAGE,
-                                   TREE_REQUEST_VERB, bos.toByteArray());
+                return new Message(FBUtilities.getLocalAddress(), 
AE_SERVICE_STAGE, TREE_REQUEST_VERB, bos.toByteArray());
             }
             catch(IOException e)
             {
@@ -641,31 +705,27 @@
 
                 // check for cached local tree
                 InetAddress local = FBUtilities.getLocalAddress();
-                MerkleTree cached =
-                    AntiEntropyService.instance().getCachedTree(request.table,
-                                                                request.cf,
-                                                                local);
+                MerkleTree cached = 
AntiEntropyService.instance().getCachedTree(request.table, request.cf, local);
                 if (cached != null)
                 {
                     if (local.equals(message.getFrom()))
+                    {
                         // we are the requestor, and we already have a cached 
tree
                         return;
+                    }
                     // respond immediately with the recently generated tree
                     Validator valid = new Validator(request, 
message.getFrom(), cached);
                     Message response = TreeResponseVerbHandler.makeVerb(local, 
valid);
                     MessagingService.instance().sendOneWay(response, 
message.getFrom());
-                    logger.debug("Answered request from " + message.getFrom() +
-                                 " for " + request + " with cached tree.");
+                    logger.debug("Answered request from " + message.getFrom() 
+ " for " + request + " with cached tree.");
                     return;
                 }
 
                 // trigger readonly-compaction
-                logger.debug("Queueing readonly compaction for request from " +
-                             message.getFrom() + " for " + request);
+                logger.debug("Queueing readonly compaction for request from " 
+ message.getFrom() + " for " + request);
                 Table table = Table.open(request.table);
-                
CompactionManager.instance().submitReadonly(table.getColumnFamilyStore(request.cf),
-                                                            message.getFrom());
-            }        
+                
CompactionManager.instance().submitReadonly(table.getColumnFamilyStore(request.cf),
 message.getFrom());
+            }
             catch (Exception e)
             {
                 logger.warn(LogUtil.throwableToString(e));            
@@ -711,8 +771,7 @@
             ObjectInputStream ois = new ObjectInputStream(dis);
             try
             {
-                Validator v = new Validator(cf, (InetAddress)ois.readObject(),
-                                            (MerkleTree)ois.readObject());
+                Validator v = new Validator(cf, (InetAddress)ois.readObject(), 
(MerkleTree)ois.readObject());
                 return v;
             }
             catch(Exception e)
@@ -731,9 +790,8 @@
             {
                 // deserialize the remote tree, and register it
                 Validator rvalidator = this.deserialize(buffer);
-                AntiEntropyService.instance().register(rvalidator.cf, 
message.getFrom(),
-                                                       rvalidator.tree);
-            }        
+                AntiEntropyService.instance().register(rvalidator.cf, 
message.getFrom(), rvalidator.tree);
+            }
             catch (Exception e)
             {
                 logger.warn(LogUtil.throwableToString(e));            

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java?rev=888171&r1=888170&r2=888171&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java
 Mon Dec  7 22:51:46 2009
@@ -20,7 +20,10 @@
 
 import java.io.File;
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import java.net.InetAddress;
 
@@ -40,15 +43,16 @@
 {   
     private static Logger logger_ = Logger.getLogger( StreamManager.class );
         
-    private static Map<InetAddress, StreamManager> streamManagers_ = new 
HashMap<InetAddress, StreamManager>();
+    private static ConcurrentMap<InetAddress, StreamManager> streamManagers_ = 
new ConcurrentHashMap<InetAddress, StreamManager>();
     
     public static StreamManager instance(InetAddress to)
     {
         StreamManager streamManager = streamManagers_.get(to);
         if ( streamManager == null )
         {
-            streamManager = new StreamManager(to);
-            streamManagers_.put(to, streamManager);
+            StreamManager possibleNew = new StreamManager(to);
+            if ((streamManager = streamManagers_.putIfAbsent(to, possibleNew)) 
== null)
+                streamManager = possibleNew;
         }
         return streamManager;
     }
@@ -79,7 +83,7 @@
         {
             File file = filesToStream_.get(0);
             if (logger_.isDebugEnabled())
-              logger_.debug("Streaming file " + file + " ...");
+              logger_.debug("Streaming " + file.length() + " length file " + 
file + " ...");
             MessagingService.instance().stream(file.getAbsolutePath(), 0L, 
file.length(), FBUtilities.getLocalAddress(), to_);
         }
     }

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/MerkleTree.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/MerkleTree.java?rev=888171&r1=888170&r2=888171&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/MerkleTree.java 
(original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/MerkleTree.java 
Mon Dec  7 22:51:46 2009
@@ -168,11 +168,11 @@
      * @param active Only ranges that intersect this range will be returned.
      * @return A list of the largest contiguous ranges where the given trees 
disagree.
      */
-    public static List<Range> difference(MerkleTree ltree, MerkleTree rtree)
+    public static List<TreeRange> difference(MerkleTree ltree, MerkleTree 
rtree)
     {
-        List<Range> diff = new ArrayList<Range>();
+        List<TreeRange> diff = new ArrayList<TreeRange>();
         Token mintoken = ltree.partitioner.getMinimumToken();
-        Range active = new Range(mintoken, mintoken);
+        TreeRange active = new TreeRange(null, mintoken, mintoken, (byte)0, 
null);
         
         byte[] lhash = ltree.hash(active);
         byte[] rhash = rtree.hash(active);
@@ -194,11 +194,11 @@
      * Takes two trees and a range for which they have hashes, but are 
inconsistent.
      * @return FULLY_INCONSISTENT if active is inconsistent, 
PARTIALLY_INCONSISTENT if only a subrange is inconsistent.
      */
-    static int differenceHelper(MerkleTree ltree, MerkleTree rtree, 
List<Range> diff, Range active)
+    static int differenceHelper(MerkleTree ltree, MerkleTree rtree, 
List<TreeRange> diff, TreeRange active)
     {
         Token midpoint = ltree.partitioner().midpoint(active.left(), 
active.right());
-        Range left = new Range(active.left(), midpoint);
-        Range right = new Range(midpoint, active.right());
+        TreeRange left = new TreeRange(null, active.left(), midpoint, 
inc(active.depth), null);
+        TreeRange right = new TreeRange(null, midpoint, active.right(), 
inc(active.depth), null);
         byte[] lhash;
         byte[] rhash;
         
@@ -471,7 +471,8 @@
      *
      * NB: A TreeRange should not be returned by a public method unless the
      * parents of the range it represents are already invalidated, since it
-     * will allow someone to modify the hash.
+     * will allow someone to modify the hash. Alternatively, a TreeRange
+     * may be created with a null tree, indicating that it is read only.
      */
     public static class TreeRange extends Range
     {
@@ -489,6 +490,7 @@
 
         public void hash(byte[] hash)
         {
+            assert tree != null : "Not intended for modification!";
             hashable.hash(hash);
         }
 
@@ -512,6 +514,7 @@
          */
         public void validate(PeekingIterator<RowHash> entries)
         {
+            assert tree != null : "Not intended for modification!";
             assert hashable instanceof Leaf;
             byte[] roothash;
             try
@@ -587,7 +590,7 @@
         {
             StringBuilder buff = new StringBuilder("#<TreeRange ");
             buff.append(super.toString()).append(" depth=").append(depth);
-            return buff.append(" 
hash=").append(hashable.hash()).append(">").toString();
+            return buff.append(">").toString();
         }
     }
 

Modified: 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableTest.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableTest.java?rev=888171&r1=888170&r2=888171&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableTest.java 
(original)
+++ 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableTest.java 
Mon Dec  7 22:51:46 2009
@@ -43,8 +43,7 @@
 
         TreeMap<String, byte[]> map = new TreeMap<String,byte[]>();
         map.put(key, bytes);
-        SSTableReader ssTable = SSTableUtils.writeSSTable("singlewrite", map, 
1,
-                                                          new 
OrderPreservingPartitioner(), 0.01);
+        SSTableReader ssTable = SSTableUtils.writeRawSSTable("table", 
"singlewrite", map);
 
         // verify
         verifySingle(ssTable, bytes, key);
@@ -72,8 +71,7 @@
         }
 
         // write
-        SSTableReader ssTable = SSTableUtils.writeSSTable("manywrites", map, 
1000,
-                                                          new 
OrderPreservingPartitioner(), 0.01);
+        SSTableReader ssTable = SSTableUtils.writeRawSSTable("table", 
"manywrites", map);
 
         // verify
         verifyMany(ssTable, map);
@@ -109,9 +107,7 @@
         }
 
         // write
-        SSTableReader ssTable = SSTableUtils.writeSSTable(ssname, map, 1000,
-                                                          new 
OrderPreservingPartitioner(), 0.01);
-
+        SSTableReader ssTable = SSTableUtils.writeRawSSTable("table", ssname, 
map);
 
         // verify
         Predicate<SSTable> cfpred;

Modified: 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableUtils.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableUtils.java?rev=888171&r1=888170&r2=888171&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableUtils.java 
(original)
+++ 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableUtils.java 
Mon Dec  7 22:51:46 2009
@@ -23,24 +23,85 @@
 
 import java.util.Map;
 import java.util.SortedMap;
+import java.util.Set;
+import java.util.TreeMap;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Column;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.Table;
 import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.service.StorageService;
 
+/**
+ * TODO: These methods imitate Memtable.writeSortedKeys to some degree, but
+ * because it is so monolithic, we can't reuse much.
+ */
 public class SSTableUtils
 {
-    public static File tempSSTableFileName(String cfname) throws IOException
+    // first configured table and cf
+    public static String TABLENAME;
+    public static String CFNAME;
+    static
     {
-        return File.createTempFile(cfname + "-", "-" + SSTable.TEMPFILE_MARKER 
+ "-Data.db");
+        try
+        {
+            TABLENAME = DatabaseDescriptor.getTables().get(0);
+            CFNAME = 
Table.open(TABLENAME).getColumnFamilies().iterator().next();
+        }
+        catch(IOException e)
+        {
+            throw new RuntimeException(e);
+        }
     }
 
-    public static SSTableReader writeSSTable(String cfname, SortedMap<String, 
byte[]> entries, int expectedKeys, IPartitioner partitioner, double 
cacheFraction) throws IOException
+    public static File tempSSTableFile(String tablename, String cfname) throws 
IOException
     {
-        File f = tempSSTableFileName(cfname);
-        SSTableWriter writer = new SSTableWriter(f.getAbsolutePath(), 
expectedKeys, partitioner);
-        for (Map.Entry<String, byte[]> entry : entries.entrySet())
+        File tempdir = File.createTempFile(tablename, cfname);
+        if(!tempdir.delete() || !tempdir.mkdir())
+            throw new IOException("Temporary directory creation failed.");
+        tempdir.deleteOnExit();
+        File tabledir = new File(tempdir, tablename);
+        tabledir.mkdir();
+        tabledir.deleteOnExit();
+        return File.createTempFile(cfname + "-",
+                                   "-" + SSTable.TEMPFILE_MARKER + "-Data.db",
+                                   tabledir);
+    }
+
+    public static SSTableReader writeSSTable(Set<String> keys) throws 
IOException
+    {
+        TreeMap<String, ColumnFamily> map = new TreeMap<String, 
ColumnFamily>();
+        for (String key : keys)
         {
-            writer.append(writer.partitioner.decorateKey(entry.getKey()), 
entry.getValue());
+            ColumnFamily cf = ColumnFamily.create(TABLENAME, CFNAME);
+            cf.addColumn(new Column(key.getBytes(), key.getBytes(), 0));
+            map.put(key, cf);
         }
-        return writer.closeAndOpenReader(cacheFraction);
+        return writeSSTable(map);
+    }
+
+    public static SSTableReader writeSSTable(SortedMap<String, ColumnFamily> 
entries) throws IOException
+    {
+        TreeMap<String, byte[]> map = new TreeMap<String, byte[]>();
+        for (Map.Entry<String, ColumnFamily> entry : entries.entrySet())
+        {
+            DataOutputBuffer buffer = new DataOutputBuffer();
+            ColumnFamily.serializer().serializeWithIndexes(entry.getValue(), 
buffer);
+            map.put(entry.getKey(), buffer.getData());
+        }
+        return writeRawSSTable(TABLENAME, CFNAME, map);
+    }
+
+    public static SSTableReader writeRawSSTable(String tablename, String 
cfname, SortedMap<String, byte[]> entries) throws IOException
+    {
+        File f = tempSSTableFile(tablename, cfname);
+        SSTableWriter writer = new SSTableWriter(f.getAbsolutePath(), 
entries.size(), StorageService.getPartitioner());
+        for (Map.Entry<String, byte[]> entry : entries.entrySet())
+            writer.append(writer.partitioner.decorateKey(entry.getKey()),
+                          entry.getValue());
+        new File(writer.indexFilename()).deleteOnExit();
+        new File(writer.filterFilename()).deleteOnExit();
+        return writer.closeAndOpenReader(1.0);
     }
 }

Modified: 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java?rev=888171&r1=888170&r2=888171&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java 
(original)
+++ 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java 
Mon Dec  7 22:51:46 2009
@@ -43,7 +43,7 @@
     @Test
     public void testTransferTable() throws Exception
     {
-        StorageService.instance().start();
+        StorageService.instance().initServer();
 
         // write a temporary SSTable, but don't register it
         Set<String> content = new HashSet<String>();

Modified: 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java?rev=888171&r1=888170&r2=888171&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
 (original)
+++ 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
 Mon Dec  7 22:51:46 2009
@@ -91,18 +91,13 @@
     {
         Validator validator;
 
-        // open an SSTable to give us something to sample
-        TreeMap<String, byte[]> map = new TreeMap<String,byte[]>();
-        for ( int i = 0; i < 1000; i++ )
-        {
-            map.put(Integer.toString(i), "blah".getBytes());
-        }
-
         // write
-        SSTableReader ssTable =
-            SSTableUtils.writeSSTable(cfname, map, 1000,
-                                      
StorageService.instance().getPartitioner(), 0.01);
-        tablename = ssTable.getTableName();
+        List<RowMutation> rms = new LinkedList<RowMutation>();
+        RowMutation rm;
+        rm = new RowMutation(tablename, "key1");
+        rm.add(new QueryPath(cfname, null, "Column1".getBytes()), 
"asdf".getBytes(), 0);
+        rms.add(rm);
+        ColumnFamilyStoreUtils.writeColumnFamily(rms);
 
         // sample
         validator = new Validator(new CFTuple(tablename, cfname), LOCAL);

Modified: 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java?rev=888171&r1=888170&r2=888171&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java
 (original)
+++ 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java
 Mon Dec  7 22:51:46 2009
@@ -527,7 +527,7 @@
             range.validate(new HIterator(range.right()));
         
         // trees should disagree for leftmost, (middle.left, rightmost.right]
-        List<Range> diffs = MerkleTree.difference(mt, mt2);
+        List<TreeRange> diffs = MerkleTree.difference(mt, mt2);
         assertEquals(diffs + " contains wrong number of differences:", 2, 
diffs.size());
         assertTrue(diffs.contains(leftmost));
         assertTrue(diffs.contains(new Range(middle.left(), 
rightmost.right())));


Reply via email to