implement index scan in terms of SP.getRangeSlice
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b08c6757 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b08c6757 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b08c6757 Branch: refs/heads/trunk Commit: b08c6757e98d355533b5536b1ca8e44dfac8a687 Parents: 2046956 Author: Jonathan Ellis <[email protected]> Authored: Fri Jan 20 14:12:06 2012 -0600 Committer: Jonathan Ellis <[email protected]> Committed: Mon Jan 23 16:00:25 2012 -0600 ---------------------------------------------------------------------- interface/cassandra.thrift | 2 +- .../org/apache/cassandra/hadoop/ConfigHelper.java | 25 ++---- .../cassandra/service/IndexScanVerbHandler.java | 1 + .../org/apache/cassandra/service/StorageProxy.java | 67 --------------- .../apache/cassandra/service/StorageService.java | 2 +- .../apache/cassandra/thrift/CassandraServer.java | 17 +++- 6 files changed, 21 insertions(+), 93 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b08c6757/interface/cassandra.thrift ---------------------------------------------------------------------- diff --git a/interface/cassandra.thrift b/interface/cassandra.thrift index a0298e5..e28b236 100644 --- a/interface/cassandra.thrift +++ b/interface/cassandra.thrift @@ -535,7 +535,7 @@ service Cassandra { /** Returns the subset of columns specified in SlicePredicate for the rows matching the IndexClause - @Deprecated; use get_range_slices instead with row_filter specified + @Deprecated; use get_range_slices instead with range.row_filter specified */ list<KeySlice> get_indexed_slices(1:required ColumnParent column_parent, 2:required IndexClause index_clause, http://git-wip-us.apache.org/repos/asf/cassandra/blob/b08c6757/src/java/org/apache/cassandra/hadoop/ConfigHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java index 47407c0..63eec8c 100644 --- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java +++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java @@ -32,6 +32,7 @@ import org.apache.cassandra.thrift.TBinaryProtocol; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Hex; import org.apache.hadoop.conf.Configuration; +import org.apache.thrift.TBase; import org.apache.thrift.TDeserializer; import org.apache.thrift.TException; import org.apache.thrift.TSerializer; @@ -170,7 +171,7 @@ public class ConfigHelper */ public static void setInputSlicePredicate(Configuration conf, SlicePredicate predicate) { - conf.set(INPUT_PREDICATE_CONFIG, predicateToString(predicate)); + conf.set(INPUT_PREDICATE_CONFIG, thriftToString(predicate)); } public static SlicePredicate getInputSlicePredicate(Configuration conf) @@ -183,14 +184,14 @@ public class ConfigHelper return conf.get(INPUT_PREDICATE_CONFIG); } - private static String predicateToString(SlicePredicate predicate) + private static String thriftToString(TBase object) { - assert predicate != null; + assert object != null; // this is so awful it's kind of cool! TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory()); try { - return Hex.bytesToHex(serializer.serialize(predicate)); + return Hex.bytesToHex(serializer.serialize(object)); } catch (TException e) { @@ -221,7 +222,7 @@ public class ConfigHelper public static void setInputRange(Configuration conf, String startToken, String endToken) { KeyRange range = new KeyRange().setStart_token(startToken).setEnd_token(endToken); - conf.set(INPUT_KEYRANGE_CONFIG, keyRangeToString(range)); + conf.set(INPUT_KEYRANGE_CONFIG, thriftToString(range)); } /** may be null if unset */ @@ -231,20 +232,6 @@ public class ConfigHelper return null != str ? keyRangeFromString(str) : null; } - private static String keyRangeToString(KeyRange keyRange) - { - assert keyRange != null; - TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory()); - try - { - return Hex.bytesToHex(serializer.serialize(keyRange)); - } - catch (TException e) - { - throw new RuntimeException(e); - } - } - private static KeyRange keyRangeFromString(String st) { assert st != null; http://git-wip-us.apache.org/repos/asf/cassandra/blob/b08c6757/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java b/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java index a5a1b20..7585469 100644 --- a/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java +++ b/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java @@ -29,6 +29,7 @@ import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; +@Deprecated // 1.1 implements index scan with RangeSliceVerb instead public class IndexScanVerbHandler implements IVerbHandler { private static final Logger logger = LoggerFactory.getLogger(IndexScanVerbHandler.class); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b08c6757/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 632f6fc..532992a 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -811,10 +811,6 @@ public class StorageProxy implements StorageProxyMBean return new ReadCallback(resolver, consistencyLevel, command, endpoints); } - /* - * This function executes the read protocol locally. Consistency checks are performed in the background. - */ - public static List<Row> getRangeSlice(RangeSliceCommand command, ConsistencyLevel consistency_level) throws IOException, UnavailableException, TimeoutException { @@ -1128,69 +1124,6 @@ public class StorageProxy implements StorageProxyMBean return writeStats.getRecentLatencyHistogramMicros(); } - public static List<Row> scan(final String keyspace, String column_family, IndexClause index_clause, SlicePredicate column_predicate, ConsistencyLevel consistency_level) - throws IOException, TimeoutException, UnavailableException - { - IPartitioner p = StorageService.getPartitioner(); - - RowPosition leftPos = RowPosition.forKey(index_clause.start_key, p); - List<AbstractBounds<RowPosition>> ranges = getRestrictedRanges(new Bounds<RowPosition>(leftPos, p.getMinimumToken().minKeyBound())); - logger.debug("scan ranges are {}", StringUtils.join(ranges, ",")); - - // now scan until we have enough results - List<Row> rows = new ArrayList<Row>(index_clause.count); - for (AbstractBounds<RowPosition> range : ranges) - { - List<InetAddress> liveEndpoints = StorageService.instance.getLiveNaturalEndpoints(keyspace, range.right); - DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(), liveEndpoints); - - // collect replies and resolve according to consistency level - RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(keyspace, liveEndpoints); - IReadCommand iCommand = new IReadCommand() - { - public String getKeyspace() - { - return keyspace; - } - }; - ReadCallback<Iterable<Row>> handler = getReadCallback(resolver, iCommand, consistency_level, liveEndpoints); - handler.assureSufficientLiveNodes(); - - IndexScanCommand command = new IndexScanCommand(keyspace, column_family, index_clause, column_predicate, range); - MessageProducer producer = new CachingMessageProducer(command); - for (InetAddress endpoint : handler.endpoints) - { - MessagingService.instance().sendRR(producer, endpoint, handler); - if (logger.isDebugEnabled()) - logger.debug("reading {} from {}", command, endpoint); - } - - try - { - for (Row row : handler.get()) - { - rows.add(row); - logger.debug("read {}", row); - } - FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getRpcTimeout()); - } - catch (TimeoutException ex) - { - if (logger.isDebugEnabled()) - logger.debug("Index scan timeout: {}", ex.toString()); - throw ex; - } - catch (DigestMismatchException e) - { - throw new AssertionError(e); - } - if (rows.size() >= index_clause.count) - return rows.subList(0, index_clause.count); - } - - return rows; - } - public boolean getHintedHandoffEnabled() { return hintedHandoffEnabled; http://git-wip-us.apache.org/repos/asf/cassandra/blob/b08c6757/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 6852fe0..fd6e1e5 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -105,7 +105,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe DEFINITIONS_UPDATE, TRUNCATE, SCHEMA_CHECK, - INDEX_SCAN, + INDEX_SCAN, // Deprecated REPLICATION_FINISHED, INTERNAL_RESPONSE, // responses to internal calls COUNTER_MUTATION, http://git-wip-us.apache.org/repos/asf/cassandra/blob/b08c6757/src/java/org/apache/cassandra/thrift/CassandraServer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java index f28c10c..2b62632 100644 --- a/src/java/org/apache/cassandra/thrift/CassandraServer.java +++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java @@ -736,14 +736,20 @@ public class CassandraServer implements Cassandra.Iface ThriftValidation.validateIndexClauses(metadata, index_clause); ThriftValidation.validateConsistencyLevel(keyspace, consistency_level, RequestType.READ); + IPartitioner p = StorageService.getPartitioner(); + AbstractBounds<RowPosition> bounds = new Bounds<RowPosition>(RowPosition.forKey(index_clause.start_key, p), + p.getMinimumToken().minKeyBound()); + RangeSliceCommand command = new RangeSliceCommand(keyspace, + column_parent.column_family, + null, + column_predicate, + bounds, + index_clause.count); + List<Row> rows; try { - rows = StorageProxy.scan(keyspace, - column_parent.column_family, - index_clause, - column_predicate, - consistency_level); + rows = StorageProxy.getRangeSlice(command, consistency_level); } catch (IOException e) { @@ -754,6 +760,7 @@ public class CassandraServer implements Cassandra.Iface logger.debug("... timed out"); throw new TimedOutException(); } + return thriftifyKeySlices(rows, column_parent, column_predicate); }
