Author: jbellis
Date: Sat Dec  5 00:22:31 2009
New Revision: 887463

URL: http://svn.apache.org/viewvc?rev=887463&view=rev
Log:
r/m misguided attempt at optimizing merging range scan results from multiple 
nodes
patch by jbellis; reviewed by Stu Hood for CASSANDRA-568

Added:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/Pair.java   
(with props)
Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java?rev=887463&r1=887462&r2=887463&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java 
(original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java 
Sat Dec  5 00:22:31 2009
@@ -397,6 +397,14 @@
                : DatabaseDescriptor.getSubComparator(table, columnFamilyName);
     }
 
+    public static ColumnFamily resolve(ColumnFamily cf1, ColumnFamily cf2)
+    {
+        if (cf1 == null)
+            return cf2;
+        cf1.resolve(cf2);
+        return cf1;
+    }
+
     public void resolve(ColumnFamily cf)
     {
         // Row _does_ allow null CF objects :(  seems a necessary evil for 
efficiency

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=887463&r1=887462&r2=887463&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java 
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java 
Sat Dec  5 00:22:31 2009
@@ -157,12 +157,11 @@
         {
             int oldSize = oldCf.size();
             int oldObjectCount = oldCf.getColumnCount();
-            oldCf.addAll(columnFamily);
+            oldCf.resolve(columnFamily);
             int newSize = oldCf.size();
             int newObjectCount = oldCf.getColumnCount();
             resolveSize(oldSize, newSize);
             resolveCount(oldObjectCount, newObjectCount);
-            oldCf.delete(columnFamily);
         }
     }
 

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java?rev=887463&r1=887462&r2=887463&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
 Sat Dec  5 00:22:31 2009
@@ -34,6 +34,7 @@
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.LogUtil;
+import org.apache.cassandra.utils.Pair;
 import org.apache.thrift.TException;
 
 import flexjson.JSONSerializer;
@@ -568,24 +569,23 @@
             throw new InvalidRequestException("maxRows must be positive");
         }
 
-        Map<String, Collection<IColumn>> colMap; // keys are sorted.
+        List<Pair<String,Collection<IColumn>>> rows;
         try
         {
-            colMap = StorageProxy.getRangeSlice(new 
RangeSliceCommand(keyspace, column_parent, predicate, start_key, finish_key, 
maxRows));
-            if (colMap == null)
-                throw new RuntimeException("KeySlice list should never be 
null.");
+            rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace, 
column_parent, predicate, start_key, finish_key, maxRows), consistency_level);
+            assert rows != null;
         }
         catch (IOException e)
         {
             throw new RuntimeException(e);
         }
 
-        List<KeySlice> keySlices = new ArrayList<KeySlice>(colMap.size());
-        for (String key : colMap.keySet())
+        List<KeySlice> keySlices = new ArrayList<KeySlice>(rows.size());
+        for (Pair<String, Collection<IColumn>> row : rows)
         {
-            Collection<IColumn> dbList = colMap.get(key);
-            List<ColumnOrSuperColumn> svcList = new 
ArrayList<ColumnOrSuperColumn>(dbList.size());
-            for (org.apache.cassandra.db.IColumn col : dbList)
+            Collection<IColumn> columns = row.right;
+            List<ColumnOrSuperColumn> svcList = new 
ArrayList<ColumnOrSuperColumn>(columns.size());
+            for (org.apache.cassandra.db.IColumn col : columns)
             {
                 if (col instanceof org.apache.cassandra.db.Column)
                     svcList.add(new ColumnOrSuperColumn(new 
org.apache.cassandra.service.Column(col.name(), col.value(), col.timestamp()), 
null));
@@ -598,7 +598,7 @@
                     svcList.add(new ColumnOrSuperColumn(null, new 
org.apache.cassandra.service.SuperColumn(col.name(), subCols)));
                 }
             }
-            keySlices.add(new KeySlice(key, svcList));
+            keySlices.add(new KeySlice(row.left, svcList));
         }
 
         return keySlices;

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=887463&r1=887462&r2=887463&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
 Sat Dec  5 00:22:31 2009
@@ -37,6 +37,7 @@
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.TimedStatsDeque;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.gms.FailureDetector;
@@ -527,20 +528,17 @@
         return rows;
     }
 
-    static Map<String, Collection<IColumn>> getRangeSlice(RangeSliceCommand 
rawCommand) throws IOException, UnavailableException, TimedOutException
+    static List<Pair<String, Collection<IColumn>>> 
getRangeSlice(RangeSliceCommand command, int consistency_level) throws 
IOException, UnavailableException, TimedOutException
     {
         long startTime = System.currentTimeMillis();
         TokenMetadata tokenMetadata = 
StorageService.instance().getTokenMetadata();
-        RangeSliceCommand command = rawCommand;
 
         InetAddress endPoint = 
StorageService.instance().findSuitableEndPoint(command.start_key);
         InetAddress startEndpoint = endPoint;
-        InetAddress wrapEndpoint = tokenMetadata.getFirstEndpoint();
 
-        TreeSet<Row> allRows = new TreeSet<Row>(rowComparator);
+        Map<String, ColumnFamily> rows = new HashMap<String, 
ColumnFamily>(command.max_keys);
         do
         {
-
             Message message = command.getMessage();
             if (logger.isDebugEnabled())
                 logger.debug("reading " + command + " from " + 
message.getMessageId() + "@" + endPoint);
@@ -555,44 +553,12 @@
                 throw new TimedOutException();
             }
             RangeSliceReply reply = RangeSliceReply.read(responseBody);
-            List<Row> rangeRows = new ArrayList<Row>(reply.rows);
-
-            // combine these what what has been seen so far.
-            if (rangeRows.size() > 0)
+            for (Row row : reply.rows)
             {
-                if (allRows.size() > 0)
-                {
-                    if (keyComparator.compare(rangeRows.get(rangeRows.size() - 
1).key, allRows.first().key) <= 0)
-                    {
-                        // unlikely, but possible
-                        if (rangeRows.get(rangeRows.size() - 
1).equals(allRows.first().key))
-                        {
-                            rangeRows.remove(rangeRows.size() - 1);
-                        }
-                        // put all from rangeRows into allRows.
-                        allRows.addAll(rangeRows);
-                    }
-                    else if (keyComparator.compare(allRows.last().key, 
rangeRows.get(0).key) <= 0)
-                    {
-                        // common case. deal with simple start/end key overlaps
-                        if (allRows.last().key.equals(rangeRows.get(0)))
-                        {
-                            allRows.remove(allRows.last().key);
-                        }
-                        allRows.addAll(rangeRows); // todo: check logic.
-                    }
-                    else
-                    {
-                        // deal with potential large overlap from scanning the 
first endpoint, which contains
-                        // both the smallest and largest keys
-                        allRows.addAll(rangeRows); // todo: check logic.
-                    }
-                }
-                else
-                    allRows.addAll(rangeRows); // todo: check logic.
+                rows.put(row.key, ColumnFamily.resolve(row.cf, 
rows.get(row.key)));
             }
 
-            if (allRows.size() >= rawCommand.max_keys || 
reply.rangeCompletedLocally)
+            if (rows.size() >= command.max_keys || reply.rangeCompletedLocally)
                 break;
 
             do
@@ -600,33 +566,35 @@
                 endPoint = tokenMetadata.getSuccessor(endPoint); // TODO move 
this into the Strategies & modify for RackAwareStrategy
             }
             while (!FailureDetector.instance().isAlive(endPoint));
-            int maxResults = endPoint == wrapEndpoint ? rawCommand.max_keys : 
rawCommand.max_keys - allRows.size();
-            command = new RangeSliceCommand(command, maxResults);
         }
         while (!endPoint.equals(startEndpoint));
 
-        Map<String, Collection<IColumn>> results = new TreeMap<String, 
Collection<IColumn>>();
-        for (Row row : allRows)
+        List<Pair<String, Collection<IColumn>>> results = new 
ArrayList<Pair<String, Collection<IColumn>>>(rows.size());
+        for (Map.Entry<String, ColumnFamily> entry : rows.entrySet())
         {
-            if (row.cf == null)
-                results.put(row.key, Collections.<IColumn>emptyList());
-            else
-                results.put(row.key, row.cf.getSortedColumns());
+            ColumnFamily cf = entry.getValue();
+            Collection<IColumn> columns = (cf == null) ? 
Collections.<IColumn>emptyList() : cf.getSortedColumns();
+            results.add(new Pair<String, Collection<IColumn>>(entry.getKey(), 
columns));
         }
+        Collections.sort(results, new Comparator<Pair<String, 
Collection<IColumn>>>()
+        {
+            public int compare(Pair<String, Collection<IColumn>> o1, 
Pair<String, Collection<IColumn>> o2)
+            {
+                return keyComparator.compare(o1.left, o2.left);
+            }
+        });
         rangeStats.add(System.currentTimeMillis() - startTime);
         return results;
     }
 
-    static List<String> getKeyRange(RangeCommand rawCommand) throws 
IOException, UnavailableException, TimedOutException
+    static List<String> getKeyRange(RangeCommand command) throws IOException, 
UnavailableException, TimedOutException
     {
         long startTime = System.currentTimeMillis();
         TokenMetadata tokenMetadata = 
StorageService.instance().getTokenMetadata();
-        List<String> allKeys = new ArrayList<String>();
-        RangeCommand command = rawCommand;
+        Set<String> uniqueKeys = new HashSet<String>(command.maxResults);
 
         InetAddress endPoint = 
StorageService.instance().findSuitableEndPoint(command.startWith);
         InetAddress startEndpoint = endPoint;
-        InetAddress wrapEndpoint = tokenMetadata.getFirstEndpoint();
 
         do
         {
@@ -646,49 +614,9 @@
                 throw new TimedOutException();
             }
             RangeReply rangeReply = RangeReply.read(responseBody);
-            List<String> rangeKeys = rangeReply.keys;
+            uniqueKeys.addAll(rangeReply.keys);
 
-            // combine keys from most recent response with the others seen so 
far
-            if (rangeKeys.size() > 0)
-            {
-                if (allKeys.size() > 0)
-                {
-                    if (keyComparator.compare(rangeKeys.get(rangeKeys.size() - 
1), allKeys.get(0)) <= 0)
-                    {
-                        // unlikely, but possible
-                        if (rangeKeys.get(rangeKeys.size() - 
1).equals(allKeys.get(0)))
-                        {
-                            rangeKeys.remove(rangeKeys.size() - 1);
-                        }
-                        rangeKeys.addAll(allKeys);
-                        allKeys = rangeKeys;
-                    }
-                    else if (keyComparator.compare(allKeys.get(allKeys.size() 
- 1), rangeKeys.get(0)) <= 0)
-                    {
-                        // common case. deal with simple start/end key overlaps
-                        if (allKeys.get(allKeys.size() - 
1).equals(rangeKeys.get(0)))
-                        {
-                            allKeys.remove(allKeys.size() - 1);
-                        }
-                        allKeys.addAll(rangeKeys);
-                    }
-                    else
-                    {
-                        // deal with potential large overlap from scanning the 
first endpoint, which contains
-                        // both the smallest and largest keys
-                        HashSet<String> keys = new HashSet<String>(allKeys);
-                        keys.addAll(rangeKeys);
-                        allKeys = new ArrayList<String>(keys);
-                        Collections.sort(allKeys);
-                    }
-                }
-                else
-                {
-                    allKeys = rangeKeys;
-                }
-            }
-
-            if (allKeys.size() >= rawCommand.maxResults || 
rangeReply.rangeCompletedLocally)
+            if (uniqueKeys.size() >= command.maxResults || 
rangeReply.rangeCompletedLocally)
             {
                 break;
             }
@@ -700,15 +628,15 @@
             // so starting with the largest in our scan of the next node means 
we'd never see keys from the middle.
             do
             {
-                endPoint = tokenMetadata.getSuccessor(endPoint); // TODO move 
this into the Strategies & modify for RackAwareStrategy
+                endPoint = tokenMetadata.getSuccessor(endPoint);
             } while (!FailureDetector.instance().isAlive(endPoint));
-            int maxResults = endPoint.equals(wrapEndpoint) ? 
rawCommand.maxResults : rawCommand.maxResults - allKeys.size();
-            command = new RangeCommand(command.table, command.columnFamily, 
command.startWith, command.stopAt, maxResults);
         } while (!endPoint.equals(startEndpoint));
 
         rangeStats.add(System.currentTimeMillis() - startTime);
-        return (allKeys.size() > rawCommand.maxResults)
-               ? allKeys.subList(0, rawCommand.maxResults)
+        List<String> allKeys = new ArrayList<String>(uniqueKeys);
+        Collections.sort(allKeys, keyComparator);
+        return (allKeys.size() > command.maxResults)
+               ? allKeys.subList(0, command.maxResults)
                : allKeys;
     }
 

Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/Pair.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/Pair.java?rev=887463&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/Pair.java 
(added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/Pair.java Sat 
Dec  5 00:22:31 2009
@@ -0,0 +1,34 @@
+package org.apache.cassandra.utils;
+
+public class Pair<T1, T2>
+{
+    public final T1 left;
+    public final T2 right;
+
+    public Pair(T1 left, T2 right)
+    {
+        this.left = left;
+        this.right = right;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        throw new UnsupportedOperationException("todo");
+    }
+
+    @Override
+    public boolean equals(Object obj)
+    {
+        throw new UnsupportedOperationException("todo");
+    }
+
+    @Override
+    public String toString()
+    {
+        return "Pair(" +
+               "left=" + left +
+               ", right=" + right +
+               ')';
+    }
+}

Propchange: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/Pair.java
------------------------------------------------------------------------------
    svn:eol-style = native


Reply via email to