Author: jbellis
Date: Sat Dec 5 00:22:31 2009
New Revision: 887463
URL: http://svn.apache.org/viewvc?rev=887463&view=rev
Log:
r/m misguided attempt at optimizing merging range scan results from multiple
nodes
patch by jbellis; reviewed by Stu Hood for CASSANDRA-568
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/Pair.java
(with props)
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java?rev=887463&r1=887462&r2=887463&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
Sat Dec 5 00:22:31 2009
@@ -397,6 +397,14 @@
: DatabaseDescriptor.getSubComparator(table, columnFamilyName);
}
+ public static ColumnFamily resolve(ColumnFamily cf1, ColumnFamily cf2)
+ {
+ if (cf1 == null)
+ return cf2;
+ cf1.resolve(cf2);
+ return cf1;
+ }
+
public void resolve(ColumnFamily cf)
{
// Row _does_ allow null CF objects :( seems a necessary evil for
efficiency
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=887463&r1=887462&r2=887463&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
Sat Dec 5 00:22:31 2009
@@ -157,12 +157,11 @@
{
int oldSize = oldCf.size();
int oldObjectCount = oldCf.getColumnCount();
- oldCf.addAll(columnFamily);
+ oldCf.resolve(columnFamily);
int newSize = oldCf.size();
int newObjectCount = oldCf.getColumnCount();
resolveSize(oldSize, newSize);
resolveCount(oldObjectCount, newObjectCount);
- oldCf.delete(columnFamily);
}
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java?rev=887463&r1=887462&r2=887463&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
Sat Dec 5 00:22:31 2009
@@ -34,6 +34,7 @@
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.LogUtil;
+import org.apache.cassandra.utils.Pair;
import org.apache.thrift.TException;
import flexjson.JSONSerializer;
@@ -568,24 +569,23 @@
throw new InvalidRequestException("maxRows must be positive");
}
- Map<String, Collection<IColumn>> colMap; // keys are sorted.
+ List<Pair<String,Collection<IColumn>>> rows;
try
{
- colMap = StorageProxy.getRangeSlice(new
RangeSliceCommand(keyspace, column_parent, predicate, start_key, finish_key,
maxRows));
- if (colMap == null)
- throw new RuntimeException("KeySlice list should never be
null.");
+ rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace,
column_parent, predicate, start_key, finish_key, maxRows), consistency_level);
+ assert rows != null;
}
catch (IOException e)
{
throw new RuntimeException(e);
}
- List<KeySlice> keySlices = new ArrayList<KeySlice>(colMap.size());
- for (String key : colMap.keySet())
+ List<KeySlice> keySlices = new ArrayList<KeySlice>(rows.size());
+ for (Pair<String, Collection<IColumn>> row : rows)
{
- Collection<IColumn> dbList = colMap.get(key);
- List<ColumnOrSuperColumn> svcList = new
ArrayList<ColumnOrSuperColumn>(dbList.size());
- for (org.apache.cassandra.db.IColumn col : dbList)
+ Collection<IColumn> columns = row.right;
+ List<ColumnOrSuperColumn> svcList = new
ArrayList<ColumnOrSuperColumn>(columns.size());
+ for (org.apache.cassandra.db.IColumn col : columns)
{
if (col instanceof org.apache.cassandra.db.Column)
svcList.add(new ColumnOrSuperColumn(new
org.apache.cassandra.service.Column(col.name(), col.value(), col.timestamp()),
null));
@@ -598,7 +598,7 @@
svcList.add(new ColumnOrSuperColumn(null, new
org.apache.cassandra.service.SuperColumn(col.name(), subCols)));
}
}
- keySlices.add(new KeySlice(key, svcList));
+ keySlices.add(new KeySlice(row.left, svcList));
}
return keySlices;
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=887463&r1=887462&r2=887463&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
Sat Dec 5 00:22:31 2009
@@ -37,6 +37,7 @@
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.TimedStatsDeque;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.gms.FailureDetector;
@@ -527,20 +528,17 @@
return rows;
}
- static Map<String, Collection<IColumn>> getRangeSlice(RangeSliceCommand
rawCommand) throws IOException, UnavailableException, TimedOutException
+ static List<Pair<String, Collection<IColumn>>>
getRangeSlice(RangeSliceCommand command, int consistency_level) throws
IOException, UnavailableException, TimedOutException
{
long startTime = System.currentTimeMillis();
TokenMetadata tokenMetadata =
StorageService.instance().getTokenMetadata();
- RangeSliceCommand command = rawCommand;
InetAddress endPoint =
StorageService.instance().findSuitableEndPoint(command.start_key);
InetAddress startEndpoint = endPoint;
- InetAddress wrapEndpoint = tokenMetadata.getFirstEndpoint();
- TreeSet<Row> allRows = new TreeSet<Row>(rowComparator);
+ Map<String, ColumnFamily> rows = new HashMap<String,
ColumnFamily>(command.max_keys);
do
{
-
Message message = command.getMessage();
if (logger.isDebugEnabled())
logger.debug("reading " + command + " from " +
message.getMessageId() + "@" + endPoint);
@@ -555,44 +553,12 @@
throw new TimedOutException();
}
RangeSliceReply reply = RangeSliceReply.read(responseBody);
- List<Row> rangeRows = new ArrayList<Row>(reply.rows);
-
- // combine these what what has been seen so far.
- if (rangeRows.size() > 0)
+ for (Row row : reply.rows)
{
- if (allRows.size() > 0)
- {
- if (keyComparator.compare(rangeRows.get(rangeRows.size() -
1).key, allRows.first().key) <= 0)
- {
- // unlikely, but possible
- if (rangeRows.get(rangeRows.size() -
1).equals(allRows.first().key))
- {
- rangeRows.remove(rangeRows.size() - 1);
- }
- // put all from rangeRows into allRows.
- allRows.addAll(rangeRows);
- }
- else if (keyComparator.compare(allRows.last().key,
rangeRows.get(0).key) <= 0)
- {
- // common case. deal with simple start/end key overlaps
- if (allRows.last().key.equals(rangeRows.get(0)))
- {
- allRows.remove(allRows.last().key);
- }
- allRows.addAll(rangeRows); // todo: check logic.
- }
- else
- {
- // deal with potential large overlap from scanning the
first endpoint, which contains
- // both the smallest and largest keys
- allRows.addAll(rangeRows); // todo: check logic.
- }
- }
- else
- allRows.addAll(rangeRows); // todo: check logic.
+ rows.put(row.key, ColumnFamily.resolve(row.cf,
rows.get(row.key)));
}
- if (allRows.size() >= rawCommand.max_keys ||
reply.rangeCompletedLocally)
+ if (rows.size() >= command.max_keys || reply.rangeCompletedLocally)
break;
do
@@ -600,33 +566,35 @@
endPoint = tokenMetadata.getSuccessor(endPoint); // TODO move
this into the Strategies & modify for RackAwareStrategy
}
while (!FailureDetector.instance().isAlive(endPoint));
- int maxResults = endPoint == wrapEndpoint ? rawCommand.max_keys :
rawCommand.max_keys - allRows.size();
- command = new RangeSliceCommand(command, maxResults);
}
while (!endPoint.equals(startEndpoint));
- Map<String, Collection<IColumn>> results = new TreeMap<String,
Collection<IColumn>>();
- for (Row row : allRows)
+ List<Pair<String, Collection<IColumn>>> results = new
ArrayList<Pair<String, Collection<IColumn>>>(rows.size());
+ for (Map.Entry<String, ColumnFamily> entry : rows.entrySet())
{
- if (row.cf == null)
- results.put(row.key, Collections.<IColumn>emptyList());
- else
- results.put(row.key, row.cf.getSortedColumns());
+ ColumnFamily cf = entry.getValue();
+ Collection<IColumn> columns = (cf == null) ?
Collections.<IColumn>emptyList() : cf.getSortedColumns();
+ results.add(new Pair<String, Collection<IColumn>>(entry.getKey(),
columns));
}
+ Collections.sort(results, new Comparator<Pair<String,
Collection<IColumn>>>()
+ {
+ public int compare(Pair<String, Collection<IColumn>> o1,
Pair<String, Collection<IColumn>> o2)
+ {
+ return keyComparator.compare(o1.left, o2.left);
+ }
+ });
rangeStats.add(System.currentTimeMillis() - startTime);
return results;
}
- static List<String> getKeyRange(RangeCommand rawCommand) throws
IOException, UnavailableException, TimedOutException
+ static List<String> getKeyRange(RangeCommand command) throws IOException,
UnavailableException, TimedOutException
{
long startTime = System.currentTimeMillis();
TokenMetadata tokenMetadata =
StorageService.instance().getTokenMetadata();
- List<String> allKeys = new ArrayList<String>();
- RangeCommand command = rawCommand;
+ Set<String> uniqueKeys = new HashSet<String>(command.maxResults);
InetAddress endPoint =
StorageService.instance().findSuitableEndPoint(command.startWith);
InetAddress startEndpoint = endPoint;
- InetAddress wrapEndpoint = tokenMetadata.getFirstEndpoint();
do
{
@@ -646,49 +614,9 @@
throw new TimedOutException();
}
RangeReply rangeReply = RangeReply.read(responseBody);
- List<String> rangeKeys = rangeReply.keys;
+ uniqueKeys.addAll(rangeReply.keys);
- // combine keys from most recent response with the others seen so
far
- if (rangeKeys.size() > 0)
- {
- if (allKeys.size() > 0)
- {
- if (keyComparator.compare(rangeKeys.get(rangeKeys.size() -
1), allKeys.get(0)) <= 0)
- {
- // unlikely, but possible
- if (rangeKeys.get(rangeKeys.size() -
1).equals(allKeys.get(0)))
- {
- rangeKeys.remove(rangeKeys.size() - 1);
- }
- rangeKeys.addAll(allKeys);
- allKeys = rangeKeys;
- }
- else if (keyComparator.compare(allKeys.get(allKeys.size()
- 1), rangeKeys.get(0)) <= 0)
- {
- // common case. deal with simple start/end key overlaps
- if (allKeys.get(allKeys.size() -
1).equals(rangeKeys.get(0)))
- {
- allKeys.remove(allKeys.size() - 1);
- }
- allKeys.addAll(rangeKeys);
- }
- else
- {
- // deal with potential large overlap from scanning the
first endpoint, which contains
- // both the smallest and largest keys
- HashSet<String> keys = new HashSet<String>(allKeys);
- keys.addAll(rangeKeys);
- allKeys = new ArrayList<String>(keys);
- Collections.sort(allKeys);
- }
- }
- else
- {
- allKeys = rangeKeys;
- }
- }
-
- if (allKeys.size() >= rawCommand.maxResults ||
rangeReply.rangeCompletedLocally)
+ if (uniqueKeys.size() >= command.maxResults ||
rangeReply.rangeCompletedLocally)
{
break;
}
@@ -700,15 +628,15 @@
// so starting with the largest in our scan of the next node means
we'd never see keys from the middle.
do
{
- endPoint = tokenMetadata.getSuccessor(endPoint); // TODO move
this into the Strategies & modify for RackAwareStrategy
+ endPoint = tokenMetadata.getSuccessor(endPoint);
} while (!FailureDetector.instance().isAlive(endPoint));
- int maxResults = endPoint.equals(wrapEndpoint) ?
rawCommand.maxResults : rawCommand.maxResults - allKeys.size();
- command = new RangeCommand(command.table, command.columnFamily,
command.startWith, command.stopAt, maxResults);
} while (!endPoint.equals(startEndpoint));
rangeStats.add(System.currentTimeMillis() - startTime);
- return (allKeys.size() > rawCommand.maxResults)
- ? allKeys.subList(0, rawCommand.maxResults)
+ List<String> allKeys = new ArrayList<String>(uniqueKeys);
+ Collections.sort(allKeys, keyComparator);
+ return (allKeys.size() > command.maxResults)
+ ? allKeys.subList(0, command.maxResults)
: allKeys;
}
Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/Pair.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/Pair.java?rev=887463&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/Pair.java
(added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/Pair.java Sat
Dec 5 00:22:31 2009
@@ -0,0 +1,34 @@
+package org.apache.cassandra.utils;
+
+public class Pair<T1, T2>
+{
+ public final T1 left;
+ public final T2 right;
+
+ public Pair(T1 left, T2 right)
+ {
+ this.left = left;
+ this.right = right;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ throw new UnsupportedOperationException("todo");
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ throw new UnsupportedOperationException("todo");
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Pair(" +
+ "left=" + left +
+ ", right=" + right +
+ ')';
+ }
+}
Propchange:
incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/Pair.java
------------------------------------------------------------------------------
svn:eol-style = native