implement index scan in terms of SP.getRangeSlice

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b08c6757
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b08c6757
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b08c6757

Branch: refs/heads/trunk
Commit: b08c6757e98d355533b5536b1ca8e44dfac8a687
Parents: 2046956
Author: Jonathan Ellis <[email protected]>
Authored: Fri Jan 20 14:12:06 2012 -0600
Committer: Jonathan Ellis <[email protected]>
Committed: Mon Jan 23 16:00:25 2012 -0600

----------------------------------------------------------------------
 interface/cassandra.thrift                         |    2 +-
 .../org/apache/cassandra/hadoop/ConfigHelper.java  |   25 ++----
 .../cassandra/service/IndexScanVerbHandler.java    |    1 +
 .../org/apache/cassandra/service/StorageProxy.java |   67 ---------------
 .../apache/cassandra/service/StorageService.java   |    2 +-
 .../apache/cassandra/thrift/CassandraServer.java   |   17 +++-
 6 files changed, 21 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b08c6757/interface/cassandra.thrift
----------------------------------------------------------------------
diff --git a/interface/cassandra.thrift b/interface/cassandra.thrift
index a0298e5..e28b236 100644
--- a/interface/cassandra.thrift
+++ b/interface/cassandra.thrift
@@ -535,7 +535,7 @@ service Cassandra {
 
   /**
     Returns the subset of columns specified in SlicePredicate for the rows 
matching the IndexClause
-    @Deprecated; use get_range_slices instead with row_filter specified
+    @Deprecated; use get_range_slices instead with range.row_filter specified
     */
   list<KeySlice> get_indexed_slices(1:required ColumnParent column_parent,
                                     2:required IndexClause index_clause,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b08c6757/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java 
b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
index 47407c0..63eec8c 100644
--- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
@@ -32,6 +32,7 @@ import org.apache.cassandra.thrift.TBinaryProtocol;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Hex;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.thrift.TBase;
 import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TException;
 import org.apache.thrift.TSerializer;
@@ -170,7 +171,7 @@ public class ConfigHelper
      */
     public static void setInputSlicePredicate(Configuration conf, 
SlicePredicate predicate)
     {
-        conf.set(INPUT_PREDICATE_CONFIG, predicateToString(predicate));
+        conf.set(INPUT_PREDICATE_CONFIG, thriftToString(predicate));
     }
 
     public static SlicePredicate getInputSlicePredicate(Configuration conf)
@@ -183,14 +184,14 @@ public class ConfigHelper
         return conf.get(INPUT_PREDICATE_CONFIG);
     }
 
-    private static String predicateToString(SlicePredicate predicate)
+    private static String thriftToString(TBase object)
     {
-        assert predicate != null;
+        assert object != null;
         // this is so awful it's kind of cool!
         TSerializer serializer = new TSerializer(new 
TBinaryProtocol.Factory());
         try
         {
-            return Hex.bytesToHex(serializer.serialize(predicate));
+            return Hex.bytesToHex(serializer.serialize(object));
         }
         catch (TException e)
         {
@@ -221,7 +222,7 @@ public class ConfigHelper
     public static void setInputRange(Configuration conf, String startToken, 
String endToken)
     {
         KeyRange range = new 
KeyRange().setStart_token(startToken).setEnd_token(endToken);
-        conf.set(INPUT_KEYRANGE_CONFIG, keyRangeToString(range));
+        conf.set(INPUT_KEYRANGE_CONFIG, thriftToString(range));
     }
 
     /** may be null if unset */
@@ -231,20 +232,6 @@ public class ConfigHelper
         return null != str ? keyRangeFromString(str) : null;
     }
 
-    private static String keyRangeToString(KeyRange keyRange)
-    {
-        assert keyRange != null;
-        TSerializer serializer = new TSerializer(new 
TBinaryProtocol.Factory());
-        try
-        {
-            return Hex.bytesToHex(serializer.serialize(keyRange));
-        }
-        catch (TException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
     private static KeyRange keyRangeFromString(String st)
     {
         assert st != null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b08c6757/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java 
b/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
index a5a1b20..7585469 100644
--- a/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
@@ -29,6 +29,7 @@ import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 
+@Deprecated // 1.1 implements index scan with RangeSliceVerb instead
 public class IndexScanVerbHandler implements IVerbHandler
 {
     private static final Logger logger = 
LoggerFactory.getLogger(IndexScanVerbHandler.class);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b08c6757/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 632f6fc..532992a 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -811,10 +811,6 @@ public class StorageProxy implements StorageProxyMBean
         return new ReadCallback(resolver, consistencyLevel, command, 
endpoints);
     }
 
-    /*
-    * This function executes the read protocol locally.  Consistency checks 
are performed in the background.
-    */
-
     public static List<Row> getRangeSlice(RangeSliceCommand command, 
ConsistencyLevel consistency_level)
     throws IOException, UnavailableException, TimeoutException
     {
@@ -1128,69 +1124,6 @@ public class StorageProxy implements StorageProxyMBean
         return writeStats.getRecentLatencyHistogramMicros();
     }
 
-    public static List<Row> scan(final String keyspace, String column_family, 
IndexClause index_clause, SlicePredicate column_predicate, ConsistencyLevel 
consistency_level)
-    throws IOException, TimeoutException, UnavailableException
-    {
-        IPartitioner p = StorageService.getPartitioner();
-
-        RowPosition leftPos = RowPosition.forKey(index_clause.start_key, p);
-        List<AbstractBounds<RowPosition>> ranges = getRestrictedRanges(new 
Bounds<RowPosition>(leftPos, p.getMinimumToken().minKeyBound()));
-        logger.debug("scan ranges are {}", StringUtils.join(ranges, ","));
-
-        // now scan until we have enough results
-        List<Row> rows = new ArrayList<Row>(index_clause.count);
-        for (AbstractBounds<RowPosition> range : ranges)
-        {
-            List<InetAddress> liveEndpoints = 
StorageService.instance.getLiveNaturalEndpoints(keyspace, range.right);
-            
DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(),
 liveEndpoints);
-
-            // collect replies and resolve according to consistency level
-            RangeSliceResponseResolver resolver = new 
RangeSliceResponseResolver(keyspace, liveEndpoints);
-            IReadCommand iCommand = new IReadCommand()
-            {
-                public String getKeyspace()
-                {
-                    return keyspace;
-                }
-            };
-            ReadCallback<Iterable<Row>> handler = getReadCallback(resolver, 
iCommand, consistency_level, liveEndpoints);
-            handler.assureSufficientLiveNodes();
-
-            IndexScanCommand command = new IndexScanCommand(keyspace, 
column_family, index_clause, column_predicate, range);
-            MessageProducer producer = new CachingMessageProducer(command);
-            for (InetAddress endpoint : handler.endpoints)
-            {
-                MessagingService.instance().sendRR(producer, endpoint, 
handler);
-                if (logger.isDebugEnabled())
-                    logger.debug("reading {} from {}", command, endpoint);
-            }
-
-            try
-            {
-                for (Row row : handler.get())
-                {
-                    rows.add(row);
-                    logger.debug("read {}", row);
-                }
-                FBUtilities.waitOnFutures(resolver.repairResults, 
DatabaseDescriptor.getRpcTimeout());
-            }
-            catch (TimeoutException ex)
-            {
-                if (logger.isDebugEnabled())
-                    logger.debug("Index scan timeout: {}", ex.toString());
-                throw ex;
-            }
-            catch (DigestMismatchException e)
-            {
-                throw new AssertionError(e);
-            }
-            if (rows.size() >= index_clause.count)
-                return rows.subList(0, index_clause.count);
-        }
-
-        return rows;
-    }
-
     public boolean getHintedHandoffEnabled()
     {
         return hintedHandoffEnabled;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b08c6757/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 6852fe0..fd6e1e5 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -105,7 +105,7 @@ public class StorageService implements 
IEndpointStateChangeSubscriber, StorageSe
         DEFINITIONS_UPDATE,
         TRUNCATE,
         SCHEMA_CHECK,
-        INDEX_SCAN,
+        INDEX_SCAN, // Deprecated
         REPLICATION_FINISHED,
         INTERNAL_RESPONSE, // responses to internal calls
         COUNTER_MUTATION,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b08c6757/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java 
b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index f28c10c..2b62632 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -736,14 +736,20 @@ public class CassandraServer implements Cassandra.Iface
         ThriftValidation.validateIndexClauses(metadata, index_clause);
         ThriftValidation.validateConsistencyLevel(keyspace, consistency_level, 
RequestType.READ);
 
+        IPartitioner p = StorageService.getPartitioner();
+        AbstractBounds<RowPosition> bounds = new 
Bounds<RowPosition>(RowPosition.forKey(index_clause.start_key, p),
+                                                                     
p.getMinimumToken().minKeyBound());
+        RangeSliceCommand command = new RangeSliceCommand(keyspace,
+                                                          
column_parent.column_family,
+                                                          null,
+                                                          column_predicate,
+                                                          bounds,
+                                                          index_clause.count);
+
         List<Row> rows;
         try
         {
-            rows = StorageProxy.scan(keyspace,
-                                     column_parent.column_family,
-                                     index_clause,
-                                     column_predicate,
-                                     consistency_level);
+            rows = StorageProxy.getRangeSlice(command, consistency_level);
         }
         catch (IOException e)
         {
@@ -754,6 +760,7 @@ public class CassandraServer implements Cassandra.Iface
             logger.debug("... timed out");
             throw new TimedOutException();
         }
+
         return thriftifyKeySlices(rows, column_parent, column_predicate);
     }
 

Reply via email to