http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java 
b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
index 3aab12f..6696e10 100644
--- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
@@ -29,14 +29,13 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.CFMetaData.SpeculativeRetry.RetryType;
-import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.config.ReadRepairDecision;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.ReadCommand;
-import org.apache.cassandra.db.ReadResponse;
-import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.partitions.PartitionIterator;
 import org.apache.cassandra.exceptions.ReadFailureException;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
 import org.apache.cassandra.exceptions.UnavailableException;
@@ -62,22 +61,15 @@ public abstract class AbstractReadExecutor
 
     protected final ReadCommand command;
     protected final List<InetAddress> targetReplicas;
-    protected final RowDigestResolver resolver;
-    protected final ReadCallback<ReadResponse, Row> handler;
+    protected final ReadCallback handler;
     protected final TraceState traceState;
 
-    AbstractReadExecutor(ReadCommand command, ConsistencyLevel 
consistencyLevel, List<InetAddress> targetReplicas)
+    AbstractReadExecutor(Keyspace keyspace, ReadCommand command, 
ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas)
     {
         this.command = command;
         this.targetReplicas = targetReplicas;
-        resolver = new RowDigestResolver(command.ksName, command.key, 
targetReplicas.size());
-        traceState = Tracing.instance.get();
-        handler = new ReadCallback<>(resolver, consistencyLevel, command, 
targetReplicas);
-    }
-
-    private static boolean isLocalRequest(InetAddress replica)
-    {
-        return replica.equals(FBUtilities.getBroadcastAddress()) && 
StorageProxy.OPTIMIZE_LOCAL_REQUESTS;
+        this.handler = new ReadCallback(new DigestResolver(keyspace, command, 
consistencyLevel, targetReplicas.size()), consistencyLevel, command, 
targetReplicas);
+        this.traceState = Tracing.instance.get();
     }
 
     protected void makeDataRequests(Iterable<InetAddress> endpoints)
@@ -98,7 +90,7 @@ public abstract class AbstractReadExecutor
 
         for (InetAddress endpoint : endpoints)
         {
-            if (isLocalRequest(endpoint))
+            if (StorageProxy.canDoLocalRequest(endpoint))
             {
                 hasLocalEndpoint = true;
                 continue;
@@ -142,7 +134,7 @@ public abstract class AbstractReadExecutor
      * wait for an answer.  Blocks until success or timeout, so it is caller's
      * responsibility to call maybeTryAdditionalReplicas first.
      */
-    public Row get() throws ReadFailureException, ReadTimeoutException, 
DigestMismatchException
+    public PartitionIterator get() throws ReadFailureException, 
ReadTimeoutException, DigestMismatchException
     {
         return handler.get();
     }
@@ -150,11 +142,11 @@ public abstract class AbstractReadExecutor
     /**
      * @return an executor appropriate for the configured speculative read 
policy
      */
-    public static AbstractReadExecutor getReadExecutor(ReadCommand command, 
ConsistencyLevel consistencyLevel) throws UnavailableException
+    public static AbstractReadExecutor 
getReadExecutor(SinglePartitionReadCommand command, ConsistencyLevel 
consistencyLevel) throws UnavailableException
     {
-        Keyspace keyspace = Keyspace.open(command.ksName);
-        List<InetAddress> allReplicas = 
StorageProxy.getLiveSortedEndpoints(keyspace, command.key);
-        ReadRepairDecision repairDecision = 
Schema.instance.getCFMetaData(command.ksName, 
command.cfName).newReadRepairDecision();
+        Keyspace keyspace = Keyspace.open(command.metadata().ksName);
+        List<InetAddress> allReplicas = 
StorageProxy.getLiveSortedEndpoints(keyspace, command.partitionKey());
+        ReadRepairDecision repairDecision = 
command.metadata().newReadRepairDecision();
         List<InetAddress> targetReplicas = 
consistencyLevel.filterForQuery(keyspace, allReplicas, repairDecision);
 
         // Throw UAE early if we don't have enough replicas.
@@ -166,19 +158,19 @@ public abstract class AbstractReadExecutor
             ReadRepairMetrics.attempted.mark();
         }
 
-        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.cfName);
+        ColumnFamilyStore cfs = 
keyspace.getColumnFamilyStore(command.metadata().cfId);
         RetryType retryType = cfs.metadata.getSpeculativeRetry().type;
 
         // Speculative retry is disabled *OR* there are simply no extra 
replicas to speculate.
         if (retryType == RetryType.NONE || consistencyLevel.blockFor(keyspace) 
== allReplicas.size())
-            return new NeverSpeculatingReadExecutor(command, consistencyLevel, 
targetReplicas);
+            return new NeverSpeculatingReadExecutor(keyspace, command, 
consistencyLevel, targetReplicas);
 
         if (targetReplicas.size() == allReplicas.size())
         {
             // CL.ALL, RRD.GLOBAL or RRD.DC_LOCAL and a single-DC.
             // We are going to contact every node anyway, so ask for 2 full 
data requests instead of 1, for redundancy
             // (same amount of requests in total, but we turn 1 digest request 
into a full blown data request).
-            return new AlwaysSpeculatingReadExecutor(cfs, command, 
consistencyLevel, targetReplicas);
+            return new AlwaysSpeculatingReadExecutor(keyspace, cfs, command, 
consistencyLevel, targetReplicas);
         }
 
         // RRD.NONE or RRD.DC_LOCAL w/ multiple DCs.
@@ -199,16 +191,16 @@ public abstract class AbstractReadExecutor
         targetReplicas.add(extraReplica);
 
         if (retryType == RetryType.ALWAYS)
-            return new AlwaysSpeculatingReadExecutor(cfs, command, 
consistencyLevel, targetReplicas);
+            return new AlwaysSpeculatingReadExecutor(keyspace, cfs, command, 
consistencyLevel, targetReplicas);
         else // PERCENTILE or CUSTOM.
-            return new SpeculatingReadExecutor(cfs, command, consistencyLevel, 
targetReplicas);
+            return new SpeculatingReadExecutor(keyspace, cfs, command, 
consistencyLevel, targetReplicas);
     }
 
     private static class NeverSpeculatingReadExecutor extends 
AbstractReadExecutor
     {
-        public NeverSpeculatingReadExecutor(ReadCommand command, 
ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas)
+        public NeverSpeculatingReadExecutor(Keyspace keyspace, ReadCommand 
command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas)
         {
-            super(command, consistencyLevel, targetReplicas);
+            super(keyspace, command, consistencyLevel, targetReplicas);
         }
 
         public void executeAsync()
@@ -234,12 +226,13 @@ public abstract class AbstractReadExecutor
         private final ColumnFamilyStore cfs;
         private volatile boolean speculated = false;
 
-        public SpeculatingReadExecutor(ColumnFamilyStore cfs,
+        public SpeculatingReadExecutor(Keyspace keyspace,
+                                       ColumnFamilyStore cfs,
                                        ReadCommand command,
                                        ConsistencyLevel consistencyLevel,
                                        List<InetAddress> targetReplicas)
         {
-            super(command, consistencyLevel, targetReplicas);
+            super(keyspace, command, consistencyLevel, targetReplicas);
             this.cfs = cfs;
         }
 
@@ -278,7 +271,7 @@ public abstract class AbstractReadExecutor
             {
                 // Could be waiting on the data, or on enough digests.
                 ReadCommand retryCommand = command;
-                if (resolver.getData() != null)
+                if (handler.resolver.isDataPresent())
                     retryCommand = command.copy().setIsDigestQuery(true);
 
                 InetAddress extraReplica = Iterables.getLast(targetReplicas);
@@ -304,12 +297,13 @@ public abstract class AbstractReadExecutor
     {
         private final ColumnFamilyStore cfs;
 
-        public AlwaysSpeculatingReadExecutor(ColumnFamilyStore cfs,
+        public AlwaysSpeculatingReadExecutor(Keyspace keyspace,
+                                             ColumnFamilyStore cfs,
                                              ReadCommand command,
                                              ConsistencyLevel consistencyLevel,
                                              List<InetAddress> targetReplicas)
         {
-            super(command, consistencyLevel, targetReplicas);
+            super(keyspace, command, consistencyLevel, targetReplicas);
             this.cfs = cfs;
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/AbstractRowResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractRowResolver.java 
b/src/java/org/apache/cassandra/service/AbstractRowResolver.java
deleted file mode 100644
index f362047..0000000
--- a/src/java/org/apache/cassandra/service/AbstractRowResolver.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import java.nio.ByteBuffer;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.ReadResponse;
-import org.apache.cassandra.db.Row;
-import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.utils.concurrent.Accumulator;
-
-public abstract class AbstractRowResolver implements 
IResponseResolver<ReadResponse, Row>
-{
-    protected static final Logger logger = 
LoggerFactory.getLogger(AbstractRowResolver.class);
-
-    protected final String keyspaceName;
-    // Accumulator gives us non-blocking thread-safety with optimal 
algorithmic constraints
-    protected final Accumulator<MessageIn<ReadResponse>> replies;
-    protected final DecoratedKey key;
-
-    public AbstractRowResolver(ByteBuffer key, String keyspaceName, int 
maxResponseCount)
-    {
-        this.key = StorageService.getPartitioner().decorateKey(key);
-        this.keyspaceName = keyspaceName;
-        this.replies = new Accumulator<>(maxResponseCount);
-    }
-
-    public void preprocess(MessageIn<ReadResponse> message)
-    {
-        replies.add(message);
-    }
-
-    public Iterable<MessageIn<ReadResponse>> getMessages()
-    {
-        return replies;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AsyncRepairCallback.java 
b/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
index 6ac765b..dec5319 100644
--- a/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
+++ b/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
@@ -29,11 +29,11 @@ import org.apache.cassandra.utils.WrappedRunnable;
 
 public class AsyncRepairCallback implements IAsyncCallback<ReadResponse>
 {
-    private final RowDataResolver repairResolver;
+    private final DataResolver repairResolver;
     private final int blockfor;
     protected final AtomicInteger received = new AtomicInteger(0);
 
-    public AsyncRepairCallback(RowDataResolver repairResolver, int blockfor)
+    public AsyncRepairCallback(DataResolver repairResolver, int blockfor)
     {
         this.repairResolver = repairResolver;
         this.blockfor = blockfor;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/CASRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CASRequest.java 
b/src/java/org/apache/cassandra/service/CASRequest.java
index 3d86637..1db100d 100644
--- a/src/java/org/apache/cassandra/service/CASRequest.java
+++ b/src/java/org/apache/cassandra/service/CASRequest.java
@@ -17,8 +17,9 @@
  */
 package org.apache.cassandra.service;
 
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.filter.IDiskAtomFilter;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.partitions.FilteredPartition;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 
 /**
@@ -27,19 +28,19 @@ import 
org.apache.cassandra.exceptions.InvalidRequestException;
 public interface CASRequest
 {
     /**
-     * The filter to use to fetch the value to compare for the CAS.
+     * The command to use to fetch the value to compare for the CAS.
      */
-    public IDiskAtomFilter readFilter();
+    public SinglePartitionReadCommand readCommand(int nowInSec);
 
     /**
      * Returns whether the provided CF, that represents the values fetched 
using the
      * readFilter(), match the CAS conditions this object stands for.
      */
-    public boolean appliesTo(ColumnFamily current) throws 
InvalidRequestException;
+    public boolean appliesTo(FilteredPartition current) throws 
InvalidRequestException;
 
     /**
      * The updates to perform of a CAS success. The values fetched using the 
readFilter()
      * are passed as argument.
      */
-    public ColumnFamily makeUpdates(ColumnFamily current) throws 
InvalidRequestException;
+    public PartitionUpdate makeUpdates(FilteredPartition current) throws 
InvalidRequestException;
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/CacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CacheService.java 
b/src/java/org/apache/cassandra/service/CacheService.java
index a775627..e82e8a4 100644
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@ -43,17 +43,20 @@ import 
org.apache.cassandra.cache.AutoSavingCache.CacheSerializer;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.partitions.ArrayBackedCachedPartition;
+import org.apache.cassandra.db.partitions.CachedPartition;
 import org.apache.cassandra.db.context.CounterContext;
-import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.concurrent.OpOrder;
 
 public class CacheService implements CacheServiceMBean
 {
@@ -362,24 +365,45 @@ public class CacheService implements CacheServiceMBean
         public Future<Pair<CounterCacheKey, ClockAndCount>> 
deserialize(DataInputStream in, final ColumnFamilyStore cfs) throws IOException
         {
             final ByteBuffer partitionKey = ByteBufferUtil.readWithLength(in);
-            final CellName cellName = 
cfs.metadata.comparator.cellFromByteBuffer(ByteBufferUtil.readWithLength(in));
+            final ByteBuffer cellName = ByteBufferUtil.readWithLength(in);
             return StageManager.getStage(Stage.READ).submit(new 
Callable<Pair<CounterCacheKey, ClockAndCount>>()
             {
                 public Pair<CounterCacheKey, ClockAndCount> call() throws 
Exception
                 {
                     DecoratedKey key = 
cfs.partitioner.decorateKey(partitionKey);
-                    QueryFilter filter = QueryFilter.getNamesFilter(key,
-                                                                    
cfs.metadata.cfName,
-                                                                    
FBUtilities.singleton(cellName, cfs.metadata.comparator),
-                                                                    
Long.MIN_VALUE);
-                    ColumnFamily cf = cfs.getTopLevelColumns(filter, 
Integer.MIN_VALUE);
-                    if (cf == null)
-                        return null;
-                    Cell cell = cf.getColumn(cellName);
-                    if (cell == null || !cell.isLive(Long.MIN_VALUE))
-                        return null;
-                    ClockAndCount clockAndCount = 
CounterContext.instance().getLocalClockAndCount(cell.value());
-                    return 
Pair.create(CounterCacheKey.create(cfs.metadata.cfId, partitionKey, cellName), 
clockAndCount);
+                    LegacyLayout.LegacyCellName name = 
LegacyLayout.decodeCellName(cfs.metadata, cellName);
+                    ColumnDefinition column = name.column;
+                    CellPath path = name.collectionElement == null ? null : 
CellPath.create(name.collectionElement);
+
+                    int nowInSec = FBUtilities.nowInSeconds();
+                    ColumnFilter.Builder builder = 
ColumnFilter.selectionBuilder();
+                    if (path == null)
+                        builder.add(column);
+                    else
+                        builder.select(column, path);
+
+                    ClusteringIndexFilter filter = new 
ClusteringIndexNamesFilter(FBUtilities.<Clustering>singleton(name.clustering, 
cfs.metadata.comparator), false);
+                    SinglePartitionReadCommand cmd = 
SinglePartitionReadCommand.create(cfs.metadata, nowInSec, key, builder.build(), 
filter);
+                    try (OpOrder.Group op = cfs.readOrdering.start(); 
RowIterator iter = UnfilteredRowIterators.filter(cmd.queryMemtableAndDisk(cfs, 
op), nowInSec))
+                    {
+                        Cell cell;
+                        if (column.isStatic())
+                        {
+                            cell = iter.staticRow().getCell(column);
+                        }
+                        else
+                        {
+                            if (!iter.hasNext())
+                                return null;
+                            cell = iter.next().getCell(column);
+                        }
+
+                        if (cell == null)
+                            return null;
+
+                        ClockAndCount clockAndCount = 
CounterContext.instance().getLocalClockAndCount(cell.value());
+                        return 
Pair.create(CounterCacheKey.create(cfs.metadata.cfId, partitionKey, 
name.clustering, column, path), clockAndCount);
+                    }
                 }
             });
         }
@@ -395,14 +419,19 @@ public class CacheService implements CacheServiceMBean
         public Future<Pair<RowCacheKey, IRowCacheEntry>> 
deserialize(DataInputStream in, final ColumnFamilyStore cfs) throws IOException
         {
             final ByteBuffer buffer = ByteBufferUtil.readWithLength(in);
+            final int rowsToCache = 
cfs.metadata.getCaching().rowCache.rowsToCache;
+
             return StageManager.getStage(Stage.READ).submit(new 
Callable<Pair<RowCacheKey, IRowCacheEntry>>()
             {
                 public Pair<RowCacheKey, IRowCacheEntry> call() throws 
Exception
                 {
                     DecoratedKey key = cfs.partitioner.decorateKey(buffer);
-                    QueryFilter cacheFilter = new QueryFilter(key, 
cfs.getColumnFamilyName(), cfs.readFilterForCache(), Integer.MIN_VALUE);
-                    ColumnFamily data = cfs.getTopLevelColumns(cacheFilter, 
Integer.MIN_VALUE);
-                    return Pair.create(new RowCacheKey(cfs.metadata.cfId, 
key), (IRowCacheEntry) data);
+                    int nowInSec = FBUtilities.nowInSeconds();
+                    try (OpOrder.Group op = cfs.readOrdering.start(); 
UnfilteredRowIterator iter = 
SinglePartitionReadCommand.fullPartitionRead(cfs.metadata, nowInSec, 
key).queryMemtableAndDisk(cfs, op))
+                    {
+                        CachedPartition toCache = 
ArrayBackedCachedPartition.create(DataLimits.cqlLimits(rowsToCache).filter(iter,
 nowInSec), nowInSec);
+                        return Pair.create(new RowCacheKey(cfs.metadata.cfId, 
key), (IRowCacheEntry)toCache);
+                    }
                 }
             });
         }
@@ -423,7 +452,7 @@ public class CacheService implements CacheServiceMBean
             ByteBufferUtil.writeWithLength(key.key, out);
             out.writeInt(key.desc.generation);
             out.writeBoolean(true);
-            key.desc.getFormat().getIndexSerializer(cfm).serialize(entry, out);
+            key.desc.getFormat().getIndexSerializer(cfm, key.desc.version, 
SerializationHeader.forKeyCache(cfm)).serialize(entry, out);
         }
 
         public Future<Pair<KeyCacheKey, RowIndexEntry>> 
deserialize(DataInputStream input, ColumnFamilyStore cfs) throws IOException
@@ -443,7 +472,10 @@ public class CacheService implements CacheServiceMBean
                 RowIndexEntry.Serializer.skipPromotedIndex(input);
                 return null;
             }
-            RowIndexEntry entry = 
reader.descriptor.getFormat().getIndexSerializer(reader.metadata).deserialize(input,
 reader.descriptor.version);
+            RowIndexEntry.IndexSerializer<?> indexSerializer = 
reader.descriptor.getFormat().getIndexSerializer(reader.metadata,
+                                                                               
                                 reader.descriptor.version,
+                                                                               
                                 SerializationHeader.forKeyCache(cfs.metadata));
+            RowIndexEntry entry = indexSerializer.deserialize(input);
             return Futures.immediateFuture(Pair.create(new 
KeyCacheKey(cfs.metadata.cfId, reader.descriptor, key), entry));
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java 
b/src/java/org/apache/cassandra/service/DataResolver.java
new file mode 100644
index 0000000..b2d1954
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import java.net.InetAddress;
+import java.util.*;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.net.AsyncOneResponse;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class DataResolver extends ResponseResolver
+{
+    private final List<AsyncOneResponse> repairResults = 
Collections.synchronizedList(new ArrayList<AsyncOneResponse>());
+
+    public DataResolver(Keyspace keyspace, ReadCommand command, 
ConsistencyLevel consistency, int maxResponseCount)
+    {
+        super(keyspace, command, consistency, maxResponseCount);
+    }
+
+    public PartitionIterator getData()
+    {
+        ReadResponse response = responses.iterator().next().payload;
+        return UnfilteredPartitionIterators.filter(response.makeIterator(), 
command.nowInSec());
+    }
+
+    public PartitionIterator resolve()
+    {
+        // We could get more responses while this method runs, which is ok 
(we're happy to ignore any response not here
+        // at the beginning of this method), so grab the response count once 
and use that through the method.
+        int count = responses.size();
+        List<UnfilteredPartitionIterator> iters = new ArrayList<>(count);
+        InetAddress[] sources = new InetAddress[count];
+        for (int i = 0; i < count; i++)
+        {
+            MessageIn<ReadResponse> msg = responses.get(i);
+            iters.add(msg.payload.makeIterator());
+            sources[i] = msg.from;
+        }
+
+        // Even though every responses should honor the limit, we might have 
more than requested post reconciliation,
+        // so ensure we're respecting the limit.
+        DataLimits.Counter counter = 
command.limits().newCounter(command.nowInSec(), true);
+        return new 
CountingPartitionIterator(mergeWithShortReadProtection(iters, sources, 
counter), counter);
+    }
+
+    private PartitionIterator 
mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results, 
InetAddress[] sources, DataLimits.Counter resultCounter)
+    {
+        // If we have only one results, there is no read repair to do and we 
can't get short reads
+        if (results.size() == 1)
+            return UnfilteredPartitionIterators.filter(results.get(0), 
command.nowInSec());
+
+        UnfilteredPartitionIterators.MergeListener listener = new 
RepairMergeListener(sources);
+
+        // So-called "short reads" stems from nodes returning only a subset of 
the results they have for a partition due to the limit,
+        // but that subset not being enough post-reconciliation. So if we 
don't have limit, don't bother.
+        if (command.limits().isUnlimited())
+            return UnfilteredPartitionIterators.mergeAndFilter(results, 
command.nowInSec(), listener);
+
+        for (int i = 0; i < results.size(); i++)
+            results.set(i, new ShortReadProtectedIterator(sources[i], 
results.get(i), resultCounter));
+
+        return UnfilteredPartitionIterators.mergeAndFilter(results, 
command.nowInSec(), listener);
+    }
+
+    private class RepairMergeListener implements 
UnfilteredPartitionIterators.MergeListener
+    {
+        private final InetAddress[] sources;
+
+        public RepairMergeListener(InetAddress[] sources)
+        {
+            this.sources = sources;
+        }
+
+        public UnfilteredRowIterators.MergeListener 
getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> 
versions)
+        {
+            return new MergeListener(partitionKey, columns(versions), 
isReversed(versions));
+        }
+
+        private PartitionColumns columns(List<UnfilteredRowIterator> versions)
+        {
+            Columns statics = Columns.NONE;
+            Columns regulars = Columns.NONE;
+            for (UnfilteredRowIterator iter : versions)
+            {
+                if (iter == null)
+                    continue;
+
+                PartitionColumns cols = iter.columns();
+                statics = statics.mergeTo(cols.statics);
+                regulars = regulars.mergeTo(cols.regulars);
+            }
+            return new PartitionColumns(statics, regulars);
+        }
+
+        private boolean isReversed(List<UnfilteredRowIterator> versions)
+        {
+            assert !versions.isEmpty();
+            // Everything will be in the same order
+            return versions.get(0).isReverseOrder();
+        }
+
+        public void close()
+        {
+            try
+            {
+                FBUtilities.waitOnFutures(repairResults, 
DatabaseDescriptor.getWriteRpcTimeout());
+            }
+            catch (TimeoutException ex)
+            {
+                // We got all responses, but timed out while repairing
+                int blockFor = consistency.blockFor(keyspace);
+                if (Tracing.isTracing())
+                    Tracing.trace("Timed out while read-repairing after 
receiving all {} data and digest responses", blockFor);
+                else
+                    logger.debug("Timeout while read-repairing after receiving 
all {} data and digest responses", blockFor);
+
+                throw new ReadTimeoutException(consistency, blockFor-1, 
blockFor, true);
+            }
+        }
+
+        private class MergeListener implements 
UnfilteredRowIterators.MergeListener
+        {
+            private final DecoratedKey partitionKey;
+            private final PartitionColumns columns;
+            private final boolean isReversed;
+            private final PartitionUpdate[] repairs = new 
PartitionUpdate[sources.length];
+
+            private final Row.Writer[] currentRows = new 
Row.Writer[sources.length];
+            private Clustering currentClustering;
+            private ColumnDefinition currentColumn;
+
+            private final Slice.Bound[] markerOpen = new 
Slice.Bound[sources.length];
+            private final DeletionTime[] markerTime = new 
DeletionTime[sources.length];
+
+            public MergeListener(DecoratedKey partitionKey, PartitionColumns 
columns, boolean isReversed)
+            {
+                this.partitionKey = partitionKey;
+                this.columns = columns;
+                this.isReversed = isReversed;
+            }
+
+            private PartitionUpdate update(int i)
+            {
+                PartitionUpdate upd = repairs[i];
+                if (upd == null)
+                {
+                    upd = new PartitionUpdate(command.metadata(), 
partitionKey, columns, 1);
+                    repairs[i] = upd;
+                }
+                return upd;
+            }
+
+            private Row.Writer currentRow(int i)
+            {
+                Row.Writer row = currentRows[i];
+                if (row == null)
+                {
+                    row = currentClustering == Clustering.STATIC_CLUSTERING ? 
update(i).staticWriter() : update(i).writer();
+                    currentClustering.writeTo(row);
+                    currentRows[i] = row;
+                }
+                return row;
+            }
+
+            public void onMergePartitionLevelDeletion(DeletionTime 
mergedDeletion, DeletionTime[] versions)
+            {
+                for (int i = 0; i < versions.length; i++)
+                {
+                    DeletionTime version = versions[i];
+                    if (mergedDeletion.supersedes(versions[i]))
+                        update(i).addPartitionDeletion(mergedDeletion);
+                }
+            }
+
+            public void onMergingRows(Clustering clustering,
+                                      LivenessInfo mergedInfo,
+                                      DeletionTime mergedDeletion,
+                                      Row[] versions)
+            {
+                currentClustering = clustering;
+                for (int i = 0; i < versions.length; i++)
+                {
+                    Row version = versions[i];
+
+                    if (version == null || 
mergedInfo.supersedes(version.primaryKeyLivenessInfo()))
+                        
currentRow(i).writePartitionKeyLivenessInfo(mergedInfo);
+
+                    if (version == null || 
mergedDeletion.supersedes(version.deletion()))
+                        currentRow(i).writeRowDeletion(mergedDeletion);
+                }
+            }
+
+            public void onMergedComplexDeletion(ColumnDefinition c, 
DeletionTime mergedCompositeDeletion, DeletionTime[] versions)
+            {
+                currentColumn = c;
+                for (int i = 0; i < versions.length; i++)
+                {
+                    DeletionTime version = versions[i] == null ? 
DeletionTime.LIVE : versions[i];
+                    if (mergedCompositeDeletion.supersedes(version))
+                        currentRow(i).writeComplexDeletion(c, 
mergedCompositeDeletion);
+                }
+            }
+
+            public void onMergedCells(Cell mergedCell, Cell[] versions)
+            {
+                for (int i = 0; i < versions.length; i++)
+                {
+                    Cell version = versions[i];
+                    Cell toAdd = version == null ? mergedCell : 
Cells.diff(mergedCell, version);
+                    if (toAdd != null)
+                        toAdd.writeTo(currentRow(i));
+                }
+            }
+
+            public void onRowDone()
+            {
+                for (int i = 0; i < currentRows.length; i++)
+                {
+                    if (currentRows[i] != null)
+                        currentRows[i].endOfRow();
+                }
+                Arrays.fill(currentRows, null);
+            }
+
+            public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker 
merged, RangeTombstoneMarker[] versions)
+            {
+                for (int i = 0; i < versions.length; i++)
+                {
+                    RangeTombstoneMarker marker = versions[i];
+                    // Note that boundaries are both close and open, so it's 
not one or the other
+                    if (merged.isClose(isReversed) && markerOpen[i] != null)
+                    {
+                        Slice.Bound open = markerOpen[i];
+                        Slice.Bound close = merged.isBoundary() ? 
((RangeTombstoneBoundaryMarker)merged).createCorrespondingCloseBound(isReversed).clustering()
 : merged.clustering();
+                        update(i).addRangeTombstone(Slice.make(isReversed ? 
close : open, isReversed ? open : close), markerTime[i]);
+                    }
+                    if (merged.isOpen(isReversed) && (marker == null || 
merged.openDeletionTime(isReversed).supersedes(marker.openDeletionTime(isReversed))))
+                    {
+                        markerOpen[i] = merged.isBoundary() ? 
((RangeTombstoneBoundaryMarker)merged).createCorrespondingOpenBound(isReversed).clustering()
 : merged.clustering();
+                        markerTime[i] = merged.openDeletionTime(isReversed);
+                    }
+                }
+            }
+
+            public void close()
+            {
+                for (int i = 0; i < repairs.length; i++)
+                {
+                    if (repairs[i] == null)
+                        continue;
+
+                    // use a separate verb here because we don't want these to 
be get the white glove hint-
+                    // on-timeout behavior that a "real" mutation gets
+                    Tracing.trace("Sending read-repair-mutation to {}", 
sources[i]);
+                    MessageOut<Mutation> msg = new 
Mutation(repairs[i]).createMessage(MessagingService.Verb.READ_REPAIR);
+                    repairResults.add(MessagingService.instance().sendRR(msg, 
sources[i]));
+                }
+            }
+        }
+    }
+
+    private class ShortReadProtectedIterator extends 
CountingUnfilteredPartitionIterator
+    {
+        private final InetAddress source;
+        private final DataLimits.Counter postReconciliationCounter;
+
+        private ShortReadProtectedIterator(InetAddress source, 
UnfilteredPartitionIterator iterator, DataLimits.Counter 
postReconciliationCounter)
+        {
+            super(iterator, command.limits().newCounter(command.nowInSec(), 
false));
+            this.source = source;
+            this.postReconciliationCounter = postReconciliationCounter;
+        }
+
+        @Override
+        public UnfilteredRowIterator next()
+        {
+            return new ShortReadProtectedRowIterator(super.next());
+        }
+
+        private class ShortReadProtectedRowIterator extends 
WrappingUnfilteredRowIterator
+        {
+            private boolean initialReadIsDone;
+            private UnfilteredRowIterator shortReadContinuation;
+            private Clustering lastClustering;
+
+            ShortReadProtectedRowIterator(UnfilteredRowIterator iter)
+            {
+                super(iter);
+            }
+
+            @Override
+            public boolean hasNext()
+            {
+                if (super.hasNext())
+                    return true;
+
+                initialReadIsDone = true;
+
+                if (shortReadContinuation != null && 
shortReadContinuation.hasNext())
+                    return true;
+
+                return checkForShortRead();
+            }
+
+            @Override
+            public Unfiltered next()
+            {
+                Unfiltered next = initialReadIsDone ? 
shortReadContinuation.next() : super.next();
+
+                if (next.kind() == Unfiltered.Kind.ROW)
+                    lastClustering = ((Row)next).clustering();
+
+                return next;
+            }
+
+            @Override
+            public void close()
+            {
+                try
+                {
+                    super.close();
+                }
+                finally
+                {
+                    if (shortReadContinuation != null)
+                        shortReadContinuation.close();
+                }
+            }
+
+            private boolean checkForShortRead()
+            {
+                assert shortReadContinuation == null || 
!shortReadContinuation.hasNext();
+
+                // We have a short read if the node this is the result of has 
returned the requested number of
+                // rows for that partition (i.e. it has stopped returning 
results due to the limit), but some of
+                // those results haven't made it in the final result 
post-reconciliation due to other nodes
+                // tombstones. If that is the case, then the node might have 
more results that we should fetch
+                // as otherwise we might return less results than required, or 
results that shouldn't be returned
+                // (because the node has tombstone that hides future results 
from other nodes but that haven't
+                // been returned due to the limit).
+                // Also note that we only get here once all the results for 
this node have been returned, and so
+                // if the node had returned the requested number but we still 
get there, it imply some results were
+                // skipped during reconciliation.
+                if (!counter.isDoneForPartition())
+                    return false;
+
+                assert !postReconciliationCounter.isDoneForPartition();
+
+                // We need to try to query enough additional results to 
fulfill our query, but because we could still
+                // get short reads on that additional query, just querying the 
number of results we miss may not be
+                // enough. But we know that when this node answered n rows 
(counter.countedInCurrentPartition), only
+                // x rows 
(postReconciliationCounter.countedInCurrentPartition()) made it in the final 
result.
+                // So our ratio of live rows to requested rows is x/n, so 
since we miss n-x rows, we estimate that
+                // we should request m rows so that m * x/n = n-x, that is m = 
(n^2/x) - n.
+                // Also note that it's ok if we retrieve more results that 
necessary since our top level iterator is a
+                // counting iterator.
+                int n = postReconciliationCounter.countedInCurrentPartition();
+                int x = counter.countedInCurrentPartition();
+                int toQuery = x == 0
+                              ? n * 2     // We didn't got any answer, so 
(somewhat randomly) ask for twice as much
+                              : Math.max(((n * n) / x) - n, 1);
+
+                DataLimits retryLimits = 
command.limits().forShortReadRetry(toQuery);
+                ClusteringIndexFilter filter = 
command.clusteringIndexFilter(partitionKey());
+                ClusteringIndexFilter retryFilter = lastClustering == null ? 
filter : filter.forPaging(metadata().comparator, lastClustering, false);
+                SinglePartitionReadCommand<?> cmd = 
SinglePartitionReadCommand.create(command.metadata(),
+                                                                               
       command.nowInSec(),
+                                                                               
       command.columnFilter(),
+                                                                               
       command.rowFilter(),
+                                                                               
       retryLimits,
+                                                                               
       partitionKey(),
+                                                                               
       retryFilter);
+
+                shortReadContinuation = doShortReadRetry(cmd);
+                return shortReadContinuation.hasNext();
+            }
+
+            private UnfilteredRowIterator 
doShortReadRetry(SinglePartitionReadCommand<?> retryCommand)
+            {
+                DataResolver resolver = new DataResolver(keyspace, 
retryCommand, ConsistencyLevel.ONE, 1);
+                ReadCallback handler = new ReadCallback(resolver, 
ConsistencyLevel.ONE, retryCommand, Collections.singletonList(source));
+                if (StorageProxy.canDoLocalRequest(source))
+                    
StageManager.getStage(Stage.READ).maybeExecuteImmediately(new 
StorageProxy.LocalReadRunnable(retryCommand, handler));
+                else
+                    
MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(), 
source, handler);
+
+                // We don't call handler.get() because we want to preserve 
tombstones since we're still in the middle of merging node results.
+                handler.awaitResults();
+                assert resolver.responses.size() == 1;
+                return 
UnfilteredPartitionIterators.getOnlyElement(resolver.responses.get(0).payload.makeIterator(),
 retryCommand);
+            }
+        }
+    }
+
+    public boolean isDataPresent()
+    {
+        return !responses.isEmpty();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/DigestResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DigestResolver.java 
b/src/java/org/apache/cassandra/service/DigestResolver.java
new file mode 100644
index 0000000..12b0626
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/DigestResolver.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.net.MessageIn;
+
+public class DigestResolver extends ResponseResolver
+{
+    private volatile ReadResponse dataResponse;
+
+    public DigestResolver(Keyspace keyspace, ReadCommand command, 
ConsistencyLevel consistency, int maxResponseCount)
+    {
+        super(keyspace, command, consistency, maxResponseCount);
+    }
+
+    @Override
+    public void preprocess(MessageIn<ReadResponse> message)
+    {
+        super.preprocess(message);
+        if (dataResponse == null && !message.payload.isDigestQuery())
+            dataResponse = message.payload;
+    }
+
+    /**
+     * Special case of resolve() so that CL.ONE reads never throw 
DigestMismatchException in the foreground
+     */
+    public PartitionIterator getData()
+    {
+        assert isDataPresent();
+        return 
UnfilteredPartitionIterators.filter(dataResponse.makeIterator(), 
command.nowInSec());
+    }
+
+    /*
+     * This method handles two different scenarios:
+     *
+     * a) we're handling the initial read of data from the closest replica + 
digests
+     *    from the rest. In this case we check the digests against each other,
+     *    throw an exception if there is a mismatch, otherwise return the data 
row.
+     *
+     * b) we're checking additional digests that arrived after the minimum to 
handle
+     *    the requested ConsistencyLevel, i.e. asynchronous read repair check
+     */
+    public PartitionIterator resolve() throws DigestMismatchException
+    {
+        if (responses.size() == 1)
+            return getData();
+
+        if (logger.isDebugEnabled())
+            logger.debug("resolving {} responses", responses.size());
+
+        long start = System.nanoTime();
+
+        // validate digests against each other; throw immediately on mismatch.
+        ByteBuffer digest = null;
+        for (MessageIn<ReadResponse> message : responses)
+        {
+            ReadResponse response = message.payload;
+
+            ByteBuffer newDigest = response.digest();
+            if (digest == null)
+                digest = newDigest;
+            else if (!digest.equals(newDigest))
+                // rely on the fact that only single partition queries use 
digests
+                throw new 
DigestMismatchException(((SinglePartitionReadCommand)command).partitionKey(), 
digest, newDigest);
+        }
+
+        if (logger.isDebugEnabled())
+            logger.debug("resolve: {} ms.", 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
+
+        return 
UnfilteredPartitionIterators.filter(dataResponse.makeIterator(), 
command.nowInSec());
+    }
+
+    public boolean isDataPresent()
+    {
+        return dataResponse != null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/IReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/IReadCommand.java 
b/src/java/org/apache/cassandra/service/IReadCommand.java
deleted file mode 100644
index c6a129e..0000000
--- a/src/java/org/apache/cassandra/service/IReadCommand.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-public interface IReadCommand
-{
-    public String getKeyspace();
-    public long getTimeout();
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/IResponseResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/IResponseResolver.java 
b/src/java/org/apache/cassandra/service/IResponseResolver.java
deleted file mode 100644
index 17c8bff..0000000
--- a/src/java/org/apache/cassandra/service/IResponseResolver.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import org.apache.cassandra.net.MessageIn;
-
-public interface IResponseResolver<TMessage, TResolved> {
-
-    /**
-     * This Method resolves the responses that are passed in . for example : if
-     * its write response then all we get is true or false return values which
-     * implies if the writes were successful but for reads its more complicated
-     * you need to look at the responses and then based on differences schedule
-     * repairs . Hence you need to derive a response resolver based on your
-     * needs from this interface.
-     */
-    public TResolved resolve() throws DigestMismatchException;
-
-    public boolean isDataPresent();
-
-    /**
-     * returns the data response without comparing with any digests
-     */
-    public TResolved getData();
-
-    public void preprocess(MessageIn<TMessage> message);
-    public Iterable<MessageIn<TMessage>> getMessages();
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java 
b/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
deleted file mode 100644
index 640681b..0000000
--- a/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import java.net.InetAddress;
-import java.util.*;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import com.google.common.collect.AbstractIterator;
-
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.RangeSliceReply;
-import org.apache.cassandra.db.Row;
-import org.apache.cassandra.net.AsyncOneResponse;
-import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.utils.CloseableIterator;
-import org.apache.cassandra.utils.MergeIterator;
-import org.apache.cassandra.utils.Pair;
-
-/**
- * Turns RangeSliceReply objects into row (string -> CF) maps, resolving
- * to the most recent ColumnFamily and setting up read repairs as necessary.
- */
-public class RangeSliceResponseResolver implements 
IResponseResolver<RangeSliceReply, Iterable<Row>>
-{
-    private static final Comparator<Pair<Row,InetAddress>> pairComparator = 
new Comparator<Pair<Row, InetAddress>>()
-    {
-        public int compare(Pair<Row, InetAddress> o1, Pair<Row, InetAddress> 
o2)
-        {
-            return o1.left.key.compareTo(o2.left.key);
-        }
-    };
-
-    private final String keyspaceName;
-    private final long timestamp;
-    private List<InetAddress> sources;
-    protected final Collection<MessageIn<RangeSliceReply>> responses = new 
ConcurrentLinkedQueue<MessageIn<RangeSliceReply>>();
-    public final List<AsyncOneResponse> repairResults = new 
ArrayList<AsyncOneResponse>();
-
-    public RangeSliceResponseResolver(String keyspaceName, long timestamp)
-    {
-        this.keyspaceName = keyspaceName;
-        this.timestamp = timestamp;
-    }
-
-    public void setSources(List<InetAddress> endpoints)
-    {
-        this.sources = endpoints;
-    }
-
-    public List<Row> getData()
-    {
-        MessageIn<RangeSliceReply> response = responses.iterator().next();
-        return response.payload.rows;
-    }
-
-    // Note: this would deserialize the response a 2nd time if getData was 
called first.
-    // (this is not currently an issue since we don't do read repair for range 
queries.)
-    public Iterable<Row> resolve()
-    {
-        ArrayList<RowIterator> iters = new 
ArrayList<RowIterator>(responses.size());
-        int n = 0;
-        for (MessageIn<RangeSliceReply> response : responses)
-        {
-            RangeSliceReply reply = response.payload;
-            n = Math.max(n, reply.rows.size());
-            iters.add(new RowIterator(reply.rows.iterator(), response.from));
-        }
-        // for each row, compute the combination of all different versions 
seen, and repair incomplete versions
-        // TODO do we need to call close?
-        CloseableIterator<Row> iter = MergeIterator.get(iters, pairComparator, 
new Reducer());
-
-        List<Row> resolvedRows = new ArrayList<Row>(n);
-        while (iter.hasNext())
-            resolvedRows.add(iter.next());
-
-        return resolvedRows;
-    }
-
-    public void preprocess(MessageIn message)
-    {
-        responses.add(message);
-    }
-
-    public boolean isDataPresent()
-    {
-        return !responses.isEmpty();
-    }
-
-    private static class RowIterator extends 
AbstractIterator<Pair<Row,InetAddress>> implements 
CloseableIterator<Pair<Row,InetAddress>>
-    {
-        private final Iterator<Row> iter;
-        private final InetAddress source;
-
-        private RowIterator(Iterator<Row> iter, InetAddress source)
-        {
-            this.iter = iter;
-            this.source = source;
-        }
-
-        protected Pair<Row,InetAddress> computeNext()
-        {
-            return iter.hasNext() ? Pair.create(iter.next(), source) : 
endOfData();
-        }
-
-        public void close() {}
-    }
-
-    public Iterable<MessageIn<RangeSliceReply>> getMessages()
-    {
-        return responses;
-    }
-
-    private class Reducer extends MergeIterator.Reducer<Pair<Row,InetAddress>, 
Row>
-    {
-        List<ColumnFamily> versions = new 
ArrayList<ColumnFamily>(sources.size());
-        List<InetAddress> versionSources = new 
ArrayList<InetAddress>(sources.size());
-        DecoratedKey key;
-
-        public void reduce(Pair<Row,InetAddress> current)
-        {
-            key = current.left.key;
-            versions.add(current.left.cf);
-            versionSources.add(current.right);
-        }
-
-        protected Row getReduced()
-        {
-            ColumnFamily resolved = versions.size() > 1
-                                  ? RowDataResolver.resolveSuperset(versions, 
timestamp)
-                                  : versions.get(0);
-            if (versions.size() < sources.size())
-            {
-                // add placeholder rows for sources that didn't have any data, 
so maybeScheduleRepairs sees them
-                for (InetAddress source : sources)
-                {
-                    if (!versionSources.contains(source))
-                    {
-                        versions.add(null);
-                        versionSources.add(source);
-                    }
-                }
-            }
-            // resolved can be null even if versions doesn't have all nulls 
because of the call to removeDeleted in resolveSuperSet
-            if (resolved != null)
-                repairResults.addAll(RowDataResolver.scheduleRepairs(resolved, 
keyspaceName, key, versions, versionSources));
-            versions.clear();
-            versionSources.clear();
-            return new Row(key, resolved);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java 
b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
deleted file mode 100644
index 0f3726c..0000000
--- a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import org.apache.cassandra.db.AbstractRangeCommand;
-import org.apache.cassandra.db.RangeSliceReply;
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.tracing.Tracing;
-
-public class RangeSliceVerbHandler implements 
IVerbHandler<AbstractRangeCommand>
-{
-    public void doVerb(MessageIn<AbstractRangeCommand> message, int id)
-    {
-        if (StorageService.instance.isBootstrapMode())
-        {
-            /* Don't service reads! */
-            throw new RuntimeException("Cannot service reads while 
bootstrapping!");
-        }
-        RangeSliceReply reply = new 
RangeSliceReply(message.payload.executeLocally());
-        Tracing.trace("Enqueuing response to {}", message.from);
-        MessagingService.instance().sendReply(reply.createMessage(), id, 
message.from);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java 
b/src/java/org/apache/cassandra/service/ReadCallback.java
index 0c008e7..d548019 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -30,8 +30,8 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ReadCommand;
-import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.partitions.PartitionIterator;
 import org.apache.cassandra.exceptions.ReadFailureException;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
 import org.apache.cassandra.exceptions.UnavailableException;
@@ -46,16 +46,16 @@ import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.SimpleCondition;
 
-public class ReadCallback<TMessage, TResolved> implements 
IAsyncCallbackWithFailure<TMessage>
+public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
 {
     protected static final Logger logger = LoggerFactory.getLogger( 
ReadCallback.class );
 
-    public final IResponseResolver<TMessage, TResolved> resolver;
+    public final ResponseResolver resolver;
     private final SimpleCondition condition = new SimpleCondition();
-    final long start;
+    private final long start;
     final int blockfor;
     final List<InetAddress> endpoints;
-    private final IReadCommand command;
+    private final ReadCommand command;
     private final ConsistencyLevel consistencyLevel;
     private static final AtomicIntegerFieldUpdater<ReadCallback> 
recievedUpdater
             = AtomicIntegerFieldUpdater.newUpdater(ReadCallback.class, 
"received");
@@ -69,14 +69,17 @@ public class ReadCallback<TMessage, TResolved> implements 
IAsyncCallbackWithFail
     /**
      * Constructor when response count has to be calculated and blocked for.
      */
-    public ReadCallback(IResponseResolver<TMessage, TResolved> resolver, 
ConsistencyLevel consistencyLevel, IReadCommand command, List<InetAddress> 
filteredEndpoints)
+    public ReadCallback(ResponseResolver resolver, ConsistencyLevel 
consistencyLevel, ReadCommand command, List<InetAddress> filteredEndpoints)
     {
-        this(resolver, consistencyLevel, 
consistencyLevel.blockFor(Keyspace.open(command.getKeyspace())), command, 
Keyspace.open(command.getKeyspace()), filteredEndpoints);
-        if (logger.isTraceEnabled())
-            logger.trace(String.format("Blockfor is %s; setting up requests to 
%s", blockfor, StringUtils.join(this.endpoints, ",")));
+        this(resolver,
+             consistencyLevel,
+             
consistencyLevel.blockFor(Keyspace.open(command.metadata().ksName)),
+             command,
+             Keyspace.open(command.metadata().ksName),
+             filteredEndpoints);
     }
 
-    public ReadCallback(IResponseResolver<TMessage, TResolved> resolver, 
ConsistencyLevel consistencyLevel, int blockfor, IReadCommand command, Keyspace 
keyspace, List<InetAddress> endpoints)
+    public ReadCallback(ResponseResolver resolver, ConsistencyLevel 
consistencyLevel, int blockfor, ReadCommand command, Keyspace keyspace, 
List<InetAddress> endpoints)
     {
         this.command = command;
         this.keyspace = keyspace;
@@ -86,7 +89,10 @@ public class ReadCallback<TMessage, TResolved> implements 
IAsyncCallbackWithFail
         this.start = System.nanoTime();
         this.endpoints = endpoints;
         // we don't support read repair (or rapid read protection) for range 
scans yet (CASSANDRA-6897)
-        assert !(resolver instanceof RangeSliceResponseResolver) || blockfor 
>= endpoints.size();
+        assert !(command instanceof PartitionRangeReadCommand) || blockfor >= 
endpoints.size();
+
+        if (logger.isTraceEnabled())
+            logger.trace(String.format("Blockfor is %s; setting up requests to 
%s", blockfor, StringUtils.join(this.endpoints, ",")));
     }
 
     public boolean await(long timePastStart, TimeUnit unit)
@@ -102,31 +108,46 @@ public class ReadCallback<TMessage, TResolved> implements 
IAsyncCallbackWithFail
         }
     }
 
-    public TResolved get() throws ReadFailureException, ReadTimeoutException, 
DigestMismatchException
+    public void awaitResults() throws ReadFailureException, 
ReadTimeoutException
     {
-        if (!await(command.getTimeout(), TimeUnit.MILLISECONDS))
+        boolean signaled = await(command.getTimeout(), TimeUnit.MILLISECONDS);
+        boolean failed = blockfor + failures > endpoints.size();
+        if (signaled && !failed)
+            return;
+
+        if (Tracing.isTracing())
         {
-            // Same as for writes, see AbstractWriteResponseHandler
-            ReadTimeoutException ex = new 
ReadTimeoutException(consistencyLevel, received, blockfor, 
resolver.isDataPresent());
-            Tracing.trace("Read timeout: {}", ex.toString());
-            if (logger.isDebugEnabled())
-                logger.debug("Read timeout: {}", ex.toString());
-            throw ex;
+            String gotData = received > 0 ? (resolver.isDataPresent() ? " 
(including data)" : " (only digests)") : "";
+            Tracing.trace("{}; received {} of {} responses{}", new Object[]{ 
(failed ? "Failed" : "Timed out"), received, blockfor, gotData });
         }
-
-        if (blockfor + failures > endpoints.size())
+        else if (logger.isDebugEnabled())
         {
-            ReadFailureException ex = new 
ReadFailureException(consistencyLevel, received, failures, blockfor, 
resolver.isDataPresent());
-
-            if (logger.isDebugEnabled())
-                logger.debug("Read failure: {}", ex.toString());
-            throw ex;
+            String gotData = received > 0 ? (resolver.isDataPresent() ? " 
(including data)" : " (only digests)") : "";
+            logger.debug("{}; received {} of {} responses{}", new Object[]{ 
(failed ? "Failed" : "Timed out"), received, blockfor, gotData });
         }
 
-        return blockfor == 1 ? resolver.getData() : resolver.resolve();
+        // Same as for writes, see AbstractWriteResponseHandler
+        throw failed
+            ? new ReadFailureException(consistencyLevel, received, failures, 
blockfor, resolver.isDataPresent())
+            : new ReadTimeoutException(consistencyLevel, received, blockfor, 
resolver.isDataPresent());
+    }
+
+    public PartitionIterator get() throws ReadFailureException, 
ReadTimeoutException, DigestMismatchException
+    {
+        awaitResults();
+
+        PartitionIterator result = blockfor == 1 ? resolver.getData() : 
resolver.resolve();
+        if (logger.isDebugEnabled())
+            logger.debug("Read: {} ms.", 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
+        return result;
+    }
+
+    public int blockFor()
+    {
+        return blockfor;
     }
 
-    public void response(MessageIn<TMessage> message)
+    public void response(MessageIn<ReadResponse> message)
     {
         resolver.preprocess(message);
         int n = waitingFor(message.from)
@@ -165,13 +186,13 @@ public class ReadCallback<TMessage, TResolved> implements 
IAsyncCallbackWithFail
         return received;
     }
 
-    public void response(TMessage result)
+    public void response(ReadResponse result)
     {
-        MessageIn<TMessage> message = 
MessageIn.create(FBUtilities.getBroadcastAddress(),
-                                                       result,
-                                                       Collections.<String, 
byte[]>emptyMap(),
-                                                       
MessagingService.Verb.INTERNAL_RESPONSE,
-                                                       
MessagingService.current_version);
+        MessageIn<ReadResponse> message = 
MessageIn.create(FBUtilities.getBroadcastAddress(),
+                                                           result,
+                                                           
Collections.<String, byte[]>emptyMap(),
+                                                           
MessagingService.Verb.INTERNAL_RESPONSE,
+                                                           
MessagingService.current_version);
         response(message);
     }
 
@@ -196,7 +217,7 @@ public class ReadCallback<TMessage, TResolved> implements 
IAsyncCallbackWithFail
 
         public void run()
         {
-            // If the resolver is a RowDigestResolver, we need to do a full 
data read if there is a mismatch.
+            // If the resolver is a DigestResolver, we need to do a full data 
read if there is a mismatch.
             // Otherwise, resolve will send the repairs directly if needs be 
(and in that case we should never
             // get a digest mismatch)
             try
@@ -205,7 +226,7 @@ public class ReadCallback<TMessage, TResolved> implements 
IAsyncCallbackWithFail
             }
             catch (DigestMismatchException e)
             {
-                assert resolver instanceof RowDigestResolver;
+                assert resolver instanceof DigestResolver;
 
                 if (traceState != null)
                     traceState.trace("Digest mismatch: {}", e.toString());
@@ -214,11 +235,10 @@ public class ReadCallback<TMessage, TResolved> implements 
IAsyncCallbackWithFail
                 
                 ReadRepairMetrics.repairedBackground.mark();
                 
-                ReadCommand readCommand = (ReadCommand) command;
-                final RowDataResolver repairResolver = new 
RowDataResolver(readCommand.ksName, readCommand.key, readCommand.filter(), 
readCommand.timestamp, endpoints.size());
+                final DataResolver repairResolver = new DataResolver(keyspace, 
command, consistencyLevel, endpoints.size());
                 AsyncRepairCallback repairHandler = new 
AsyncRepairCallback(repairResolver, endpoints.size());
 
-                MessageOut<ReadCommand> message = ((ReadCommand) 
command).createMessage();
+                MessageOut<ReadCommand> message = command.createMessage();
                 for (InetAddress endpoint : endpoints)
                     MessagingService.instance().sendRR(message, endpoint, 
repairHandler);
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/ResponseResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ResponseResolver.java 
b/src/java/org/apache/cassandra/service/ResponseResolver.java
new file mode 100644
index 0000000..e7c94a1
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/ResponseResolver.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.utils.concurrent.Accumulator;
+
+public abstract class ResponseResolver
+{
+    protected static final Logger logger = 
LoggerFactory.getLogger(ResponseResolver.class);
+
+    protected final Keyspace keyspace;
+    protected final ReadCommand command;
+    protected final ConsistencyLevel consistency;
+
+    // Accumulator gives us non-blocking thread-safety with optimal 
algorithmic constraints
+    protected final Accumulator<MessageIn<ReadResponse>> responses;
+
+    public ResponseResolver(Keyspace keyspace, ReadCommand command, 
ConsistencyLevel consistency, int maxResponseCount)
+    {
+        this.keyspace = keyspace;
+        this.command = command;
+        this.consistency = consistency;
+        this.responses = new Accumulator<>(maxResponseCount);
+    }
+
+    public abstract PartitionIterator getData();
+    public abstract PartitionIterator resolve() throws DigestMismatchException;
+
+    public abstract boolean isDataPresent();
+
+    public void preprocess(MessageIn<ReadResponse> message)
+    {
+        responses.add(message);
+    }
+
+    public Iterable<MessageIn<ReadResponse>> getMessages()
+    {
+        return responses;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/RowDataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RowDataResolver.java 
b/src/java/org/apache/cassandra/service/RowDataResolver.java
deleted file mode 100644
index e935ce7..0000000
--- a/src/java/org/apache/cassandra/service/RowDataResolver.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import java.net.InetAddress;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.collect.Iterables;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
-import org.apache.cassandra.db.filter.IDiskAtomFilter;
-import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.net.*;
-import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.CloseableIterator;
-import org.apache.cassandra.utils.FBUtilities;
-
-public class RowDataResolver extends AbstractRowResolver
-{
-    private int maxLiveCount = 0;
-    public List<AsyncOneResponse> repairResults = Collections.emptyList();
-    private final IDiskAtomFilter filter;
-    private final long timestamp;
-
-    public RowDataResolver(String keyspaceName, ByteBuffer key, 
IDiskAtomFilter qFilter, long timestamp, int maxResponseCount)
-    {
-        super(key, keyspaceName, maxResponseCount);
-        this.filter = qFilter;
-        this.timestamp = timestamp;
-    }
-
-    /*
-    * This method handles the following scenario:
-    *
-    * there was a mismatch on the initial read, so we redid the digest requests
-    * as full data reads.  In this case we need to compute the most recent 
version
-    * of each column, and send diffs to out-of-date replicas.
-    */
-    public Row resolve() throws DigestMismatchException
-    {
-        int replyCount = replies.size();
-        if (logger.isDebugEnabled())
-            logger.debug("resolving {} responses", replyCount);
-        long start = System.nanoTime();
-
-        ColumnFamily resolved;
-        if (replyCount > 1)
-        {
-            List<ColumnFamily> versions = new ArrayList<>(replyCount);
-            List<InetAddress> endpoints = new ArrayList<>(replyCount);
-
-            for (MessageIn<ReadResponse> message : replies)
-            {
-                ReadResponse response = message.payload;
-                ColumnFamily cf = response.row().cf;
-                assert !response.isDigestQuery() : "Received digest response 
to repair read from " + message.from;
-                versions.add(cf);
-                endpoints.add(message.from);
-
-                // compute maxLiveCount to prevent short reads -- see 
https://issues.apache.org/jira/browse/CASSANDRA-2643
-                int liveCount = cf == null ? 0 : filter.getLiveCount(cf, 
timestamp);
-                if (liveCount > maxLiveCount)
-                    maxLiveCount = liveCount;
-            }
-
-            resolved = resolveSuperset(versions, timestamp);
-            if (logger.isDebugEnabled())
-                logger.debug("versions merged");
-
-            // send updates to any replica that was missing part of the full 
row
-            // (resolved can be null even if versions doesn't have all nulls 
because of the call to removeDeleted in resolveSuperSet)
-            if (resolved != null)
-                repairResults = scheduleRepairs(resolved, keyspaceName, key, 
versions, endpoints);
-        }
-        else
-        {
-            resolved = replies.get(0).payload.row().cf;
-        }
-
-        if (logger.isDebugEnabled())
-            logger.debug("resolve: {} ms.", 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
-
-        return new Row(key, resolved);
-    }
-
-    /**
-     * For each row version, compare with resolved (the superset of all row 
versions);
-     * if it is missing anything, send a mutation to the endpoint it come from.
-     */
-    public static List<AsyncOneResponse> scheduleRepairs(ColumnFamily 
resolved, String keyspaceName, DecoratedKey key, List<ColumnFamily> versions, 
List<InetAddress> endpoints)
-    {
-        List<AsyncOneResponse> results = new 
ArrayList<AsyncOneResponse>(versions.size());
-
-        for (int i = 0; i < versions.size(); i++)
-        {
-            ColumnFamily diffCf = ColumnFamily.diff(versions.get(i), resolved);
-            if (diffCf == null) // no repair needs to happen
-                continue;
-
-            // create and send the mutation message based on the diff
-            Mutation mutation = new Mutation(keyspaceName, key.getKey(), 
diffCf);
-            // use a separate verb here because we don't want these to be get 
the white glove hint-
-            // on-timeout behavior that a "real" mutation gets
-            Tracing.trace("Sending read-repair-mutation to {}", 
endpoints.get(i));
-            
results.add(MessagingService.instance().sendRR(mutation.createMessage(MessagingService.Verb.READ_REPAIR),
-                                                           endpoints.get(i)));
-        }
-
-        return results;
-    }
-
-    static ColumnFamily resolveSuperset(Iterable<ColumnFamily> versions, long 
now)
-    {
-        assert Iterables.size(versions) > 0;
-
-        ColumnFamily resolved = null;
-        for (ColumnFamily cf : versions)
-        {
-            if (cf == null)
-                continue;
-
-            if (resolved == null)
-                resolved = cf.cloneMeShallow();
-            else
-                resolved.delete(cf);
-        }
-        if (resolved == null)
-            return null;
-
-        // mimic the collectCollatedColumn + removeDeleted path that 
getColumnFamily takes.
-        // this will handle removing columns and subcolumns that are 
suppressed by a row or
-        // supercolumn tombstone.
-        QueryFilter filter = new QueryFilter(null, resolved.metadata().cfName, 
new IdentityQueryFilter(), now);
-        List<CloseableIterator<Cell>> iters = new 
ArrayList<>(Iterables.size(versions));
-        for (ColumnFamily version : versions)
-            if (version != null)
-                iters.add(FBUtilities.closeableIterator(version.iterator()));
-        filter.collateColumns(resolved, iters, Integer.MIN_VALUE);
-        return ColumnFamilyStore.removeDeleted(resolved, Integer.MIN_VALUE);
-    }
-
-    public Row getData()
-    {
-        assert !replies.isEmpty();
-        return replies.get(0).payload.row();
-    }
-
-    public boolean isDataPresent()
-    {
-        return !replies.isEmpty();
-    }
-
-    public int getMaxLiveCount()
-    {
-        return maxLiveCount;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/RowDigestResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RowDigestResolver.java 
b/src/java/org/apache/cassandra/service/RowDigestResolver.java
deleted file mode 100644
index 82ccc1a..0000000
--- a/src/java/org/apache/cassandra/service/RowDigestResolver.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.ReadResponse;
-import org.apache.cassandra.db.Row;
-import org.apache.cassandra.net.MessageIn;
-
-public class RowDigestResolver extends AbstractRowResolver
-{
-    public RowDigestResolver(String keyspaceName, ByteBuffer key, int 
maxResponseCount)
-    {
-        super(key, keyspaceName, maxResponseCount);
-    }
-
-    /**
-     * Special case of resolve() so that CL.ONE reads never throw 
DigestMismatchException in the foreground
-     */
-    public Row getData()
-    {
-        for (MessageIn<ReadResponse> message : replies)
-        {
-            ReadResponse result = message.payload;
-            if (!result.isDigestQuery())
-                return result.row();
-        }
-        return null;
-    }
-
-    /*
-     * This method handles two different scenarios:
-     *
-     * a) we're handling the initial read, of data from the closest replica + 
digests
-     *    from the rest.  In this case we check the digests against each other,
-     *    throw an exception if there is a mismatch, otherwise return the data 
row.
-     *
-     * b) we're checking additional digests that arrived after the minimum to 
handle
-     *    the requested ConsistencyLevel, i.e. asynchronous read repair check
-     */
-    public Row resolve() throws DigestMismatchException
-    {
-        if (logger.isDebugEnabled())
-            logger.debug("resolving {} responses", replies.size());
-
-        long start = System.nanoTime();
-
-        // validate digests against each other; throw immediately on mismatch.
-        // also extract the data reply, if any.
-        ColumnFamily data = null;
-        ByteBuffer digest = null;
-
-        for (MessageIn<ReadResponse> message : replies)
-        {
-            ReadResponse response = message.payload;
-
-            ByteBuffer newDigest;
-            if (response.isDigestQuery())
-            {
-                newDigest = response.digest();
-            }
-            else
-            {
-                // note that this allows for multiple data replies, 
post-CASSANDRA-5932
-                data = response.row().cf;
-                newDigest = ColumnFamily.digest(data);
-            }
-
-            if (digest == null)
-                digest = newDigest;
-            else if (!digest.equals(newDigest))
-                throw new DigestMismatchException(key, digest, newDigest);
-        }
-
-        if (logger.isDebugEnabled())
-            logger.debug("resolve: {} ms.", 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
-        return new Row(key, data);
-    }
-
-    public boolean isDataPresent()
-    {
-        for (MessageIn<ReadResponse> message : replies)
-        {
-            if (!message.payload.isDigestQuery())
-                return true;
-        }
-        return false;
-    }
-}

Reply via email to