Merge branch 'cassandra-1.2' into trunk

Conflicts:
        src/java/org/apache/cassandra/cql/QueryProcessor.java
        src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
        src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java
        src/java/org/apache/cassandra/service/ReadCallback.java
        src/java/org/apache/cassandra/service/RowDataResolver.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/27392484
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/27392484
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/27392484

Branch: refs/heads/trunk
Commit: 273924847b255f7c358defa41fa5906d55a22025
Parents: e306a87 6a03b11
Author: Aleksey Yeschenko <[email protected]>
Authored: Thu Apr 4 02:10:43 2013 +0300
Committer: Aleksey Yeschenko <[email protected]>
Committed: Thu Apr 4 02:13:26 2013 +0300

----------------------------------------------------------------------
 .../org/apache/cassandra/cql/QueryProcessor.java   |   31 +++--------
 .../cql3/statements/ModificationStatement.java     |   45 ++++++---------
 .../cassandra/cql3/statements/SelectStatement.java |   25 ++-------
 .../org/apache/cassandra/db/CounterMutation.java   |    2 +-
 src/java/org/apache/cassandra/db/ReadCommand.java  |    2 +-
 .../org/apache/cassandra/db/ReadVerbHandler.java   |   25 +++------
 .../cassandra/db/RetriedSliceFromReadCommand.java  |    1 -
 .../cassandra/db/SliceByNamesReadCommand.java      |    2 +-
 .../cassandra/service/AbstractReadExecutor.java    |    2 +-
 .../cassandra/service/IResponseResolver.java       |    6 +-
 .../service/RangeSliceResponseResolver.java        |    5 +-
 .../org/apache/cassandra/service/ReadCallback.java |    8 +--
 .../apache/cassandra/service/RowDataResolver.java  |    5 +-
 .../cassandra/service/RowDigestResolver.java       |    3 +-
 .../org/apache/cassandra/service/StorageProxy.java |   10 ++--
 .../apache/cassandra/thrift/CassandraServer.java   |   16 -----
 16 files changed, 58 insertions(+), 130 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/27392484/src/java/org/apache/cassandra/cql/QueryProcessor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql/QueryProcessor.java
index e2fba6f,5977301..b365644
--- a/src/java/org/apache/cassandra/cql/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql/QueryProcessor.java
@@@ -183,20 -179,14 +175,13 @@@ public class QueryProcesso
                    ? select.getNumRecords() + 1
                    : select.getNumRecords();
  
-         try
-         {
-             rows = StorageProxy.getRangeSlice(new 
RangeSliceCommand(metadata.ksName,
-                                                                     
select.getColumnFamily(),
-                                                                     
columnFilter,
-                                                                     bounds,
-                                                                     
expressions,
-                                                                     limit),
-                                                                     
select.getConsistencyLevel());
-         }
-         catch (IOException e)
-         {
-             throw new RuntimeException(e);
-         }
+         List<org.apache.cassandra.db.Row> rows = 
StorageProxy.getRangeSlice(new RangeSliceCommand(metadata.ksName,
+                                                                               
                    select.getColumnFamily(),
 -                                                                              
                    null,
+                                                                               
                    columnFilter,
+                                                                               
                    bounds,
+                                                                               
                    expressions,
+                                                                               
                    limit),
+                                                                             
select.getConsistencyLevel());
  
          // if start key was set and relation was "greater than"
          if (select.getKeyStart() != null && !select.includeStartKey() && 
!rows.isEmpty())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/27392484/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 6118937,28a003e..87843d2
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@@ -164,36 -163,29 +162,29 @@@ public abstract class ModificationState
          for (ByteBuffer key : keys)
              commands.add(new SliceFromReadCommand(keyspace(),
                                                    key,
 -                                                  new 
QueryPath(columnFamily()),
 +                                                  columnFamily(),
                                                    new 
SliceQueryFilter(slices, false, Integer.MAX_VALUE)));
  
-         try
-         {
-             List<Row> rows = local
-                            ? SelectStatement.readLocally(keyspace(), commands)
-                            : StorageProxy.read(commands, cl);
- 
-             Map<ByteBuffer, ColumnGroupMap> map = new HashMap<ByteBuffer, 
ColumnGroupMap>();
-             for (Row row : rows)
-             {
-                 if (row.cf == null || row.cf.isEmpty())
-                     continue;
- 
-                 ColumnGroupMap.Builder groupBuilder = new 
ColumnGroupMap.Builder(composite, true);
-                 for (Column column : row.cf)
-                     groupBuilder.add(column);
- 
-                 List<ColumnGroupMap> groups = groupBuilder.groups();
-                 assert groups.isEmpty() || groups.size() == 1;
-                 if (!groups.isEmpty())
-                     map.put(row.key.key, groups.get(0));
-             }
-             return map;
-         }
-         catch (IOException e)
+         List<Row> rows = local
+                        ? SelectStatement.readLocally(keyspace(), commands)
+                        : StorageProxy.read(commands, cl);
+ 
+         Map<ByteBuffer, ColumnGroupMap> map = new HashMap<ByteBuffer, 
ColumnGroupMap>();
+         for (Row row : rows)
          {
-             throw new IOError(e);
+             if (row.cf == null || row.cf.isEmpty())
+                 continue;
+ 
+             ColumnGroupMap.Builder groupBuilder = new 
ColumnGroupMap.Builder(composite, true);
 -            for (IColumn column : row.cf)
++            for (Column column : row.cf)
+                 groupBuilder.add(column);
+ 
+             List<ColumnGroupMap> groups = groupBuilder.groups();
+             assert groups.isEmpty() || groups.size() == 1;
+             if (!groups.isEmpty())
+                 map.put(row.key.key, groups.get(0));
          }
+         return map;
      }
  
      /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/27392484/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/27392484/src/java/org/apache/cassandra/db/CounterMutation.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/27392484/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/27392484/src/java/org/apache/cassandra/db/ReadVerbHandler.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/27392484/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/27392484/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
index ffcab0a,bb3b7b9..82ba720
--- a/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
@@@ -50,10 -58,10 +50,10 @@@ public class SliceByNamesReadCommand ex
          return readCommand;
      }
  
-     public Row getRow(Table table) throws IOException
+     public Row getRow(Table table)
      {
          DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
 -        return table.getRow(new QueryFilter(dk, queryPath, filter));
 +        return table.getRow(new QueryFilter(dk, cfName, filter));
      }
  
      @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/27392484/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/AbstractReadExecutor.java
index 7a15bda,0000000..9c17678
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
@@@ -1,245 -1,0 +1,245 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.service;
 +
 +import java.io.IOException;
 +import java.net.InetAddress;
 +import java.util.Arrays;
 +import java.util.Comparator;
 +import java.util.List;
 +
 +import org.apache.cassandra.concurrent.Stage;
 +import org.apache.cassandra.concurrent.StageManager;
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.Schema;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.ConsistencyLevel;
 +import org.apache.cassandra.db.ReadCommand;
 +import org.apache.cassandra.db.ReadResponse;
 +import org.apache.cassandra.db.Row;
 +import org.apache.cassandra.db.Table;
 +import org.apache.cassandra.exceptions.ReadTimeoutException;
 +import org.apache.cassandra.exceptions.UnavailableException;
 +import org.apache.cassandra.net.MessageOut;
 +import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.service.StorageProxy.LocalReadRunnable;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.common.primitives.Longs;
 +
 +public abstract class AbstractReadExecutor
 +{
 +    private static final Logger logger = 
LoggerFactory.getLogger(AbstractReadExecutor.class);
 +    protected final ReadCallback<ReadResponse, Row> handler;
 +    protected final ReadCommand command;
 +    protected final RowDigestResolver resolver;
 +    protected final List<InetAddress> unfiltered;
 +    protected final List<InetAddress> endpoints;
 +    protected final ColumnFamilyStore cfs;
 +
 +    AbstractReadExecutor(ColumnFamilyStore cfs,
 +                         ReadCommand command,
 +                         ConsistencyLevel consistency_level,
 +                         List<InetAddress> allReplicas,
 +                         List<InetAddress> queryTargets)
 +    throws UnavailableException
 +    {
 +        unfiltered = allReplicas;
 +        this.endpoints = queryTargets;
 +        this.resolver = new RowDigestResolver(command.table, command.key);
 +        this.handler = new ReadCallback<ReadResponse, Row>(resolver, 
consistency_level, command, this.endpoints);
 +        this.command = command;
 +        this.cfs = cfs;
 +
 +        handler.assureSufficientLiveNodes();
 +        assert !handler.endpoints.isEmpty();
 +    }
 +
 +    void executeAsync()
 +    {
 +        // The data-request message is sent to dataPoint, the node that will 
actually get the data for us
 +        InetAddress dataPoint = handler.endpoints.get(0);
 +        if (dataPoint.equals(FBUtilities.getBroadcastAddress()) && 
StorageProxy.OPTIMIZE_LOCAL_REQUESTS)
 +        {
 +            logger.trace("reading data locally");
 +            StageManager.getStage(Stage.READ).execute(new 
LocalReadRunnable(command, handler));
 +        }
 +        else
 +        {
 +            logger.trace("reading data from {}", dataPoint);
 +            MessagingService.instance().sendRR(command.createMessage(), 
dataPoint, handler);
 +        }
 +
 +        if (handler.endpoints.size() == 1)
 +            return;
 +
 +        // send the other endpoints a digest request
 +        ReadCommand digestCommand = command.copy();
 +        digestCommand.setDigestQuery(true);
 +        MessageOut<?> message = null;
 +        for (int i = 1; i < handler.endpoints.size(); i++)
 +        {
 +            InetAddress digestPoint = handler.endpoints.get(i);
 +            if (digestPoint.equals(FBUtilities.getBroadcastAddress()))
 +            {
 +                logger.trace("reading digest locally");
 +                StageManager.getStage(Stage.READ).execute(new 
LocalReadRunnable(digestCommand, handler));
 +            }
 +            else
 +            {
 +                logger.trace("reading digest from {}", digestPoint);
 +                // (We lazy-construct the digest Message object since it may 
not be necessary if we
 +                // are doing a local digest read, or no digest reads at all.)
 +                if (message == null)
 +                    message = digestCommand.createMessage();
 +                MessagingService.instance().sendRR(message, digestPoint, 
handler);
 +            }
 +        }
 +    }
 +
 +    void speculate()
 +    {
 +        // noop by default.
 +    }
 +
-     Row get() throws ReadTimeoutException, DigestMismatchException, 
IOException
++    Row get() throws ReadTimeoutException, DigestMismatchException
 +    {
 +        return handler.get();
 +    }
 +
 +    public static AbstractReadExecutor getReadExecutor(ReadCommand command, 
ConsistencyLevel consistency_level) throws UnavailableException
 +    {
 +        Table table = Table.open(command.table);
 +        List<InetAddress> allReplicas = 
StorageProxy.getLiveSortedEndpoints(table, command.key);
 +        CFMetaData metaData = Schema.instance.getCFMetaData(command.table, 
command.cfName);
 +        List<InetAddress> queryTargets = 
consistency_level.filterForQuery(table, allReplicas, 
metaData.newReadRepairDecision());
 +
 +        if (StorageService.instance.isClientMode())
 +        {
 +            return new DefaultReadExecutor(null, command, consistency_level, 
allReplicas, queryTargets);
 +        }
 +
 +        ColumnFamilyStore cfs = table.getColumnFamilyStore(command.cfName);
 +
 +        switch (metaData.getSpeculativeRetry().type)
 +        {
 +            case ALWAYS:
 +                return new SpeculateAlwaysExecutor(cfs, command, 
consistency_level, allReplicas, queryTargets);
 +            case PERCENTILE:
 +            case CUSTOM:
 +                return queryTargets.size() < allReplicas.size()
 +                       ? new SpeculativeReadExecutor(cfs, command, 
consistency_level, allReplicas, queryTargets)
 +                       : new DefaultReadExecutor(cfs, command, 
consistency_level, allReplicas, queryTargets);
 +            default:
 +                return new DefaultReadExecutor(cfs, command, 
consistency_level, allReplicas, queryTargets);
 +        }
 +    }
 +
 +    private static class DefaultReadExecutor extends AbstractReadExecutor
 +    {
 +        public DefaultReadExecutor(ColumnFamilyStore cfs, ReadCommand 
command, ConsistencyLevel consistency_level, List<InetAddress> allReplicas, 
List<InetAddress> queryTargets) throws UnavailableException
 +        {
 +            super(cfs, command, consistency_level, allReplicas, queryTargets);
 +        }
 +    }
 +
 +    private static class SpeculativeReadExecutor extends AbstractReadExecutor
 +    {
 +        public SpeculativeReadExecutor(ColumnFamilyStore cfs, ReadCommand 
command, ConsistencyLevel consistency_level, List<InetAddress> allReplicas, 
List<InetAddress> queryTargets) throws UnavailableException
 +        {
 +            super(cfs, command, consistency_level, allReplicas, queryTargets);
 +            assert handler.endpoints.size() < unfiltered.size();
 +        }
 +
 +        @Override
 +        void speculate()
 +        {
 +            // no latency information, or we're overloaded
 +            if (cfs.sampleLatency > command.getTimeout())
 +                return;
 +
 +            if (!handler.await(cfs.sampleLatency))
 +            {
 +                InetAddress endpoint = 
unfiltered.get(handler.endpoints.size());
 +
 +                // could be waiting on the data, or on enough digests
 +                ReadCommand scommand = command;
 +                if (resolver.getData() != null)
 +                {
 +                    scommand = command.copy();
 +                    scommand.setDigestQuery(true);
 +                }
 +
 +                logger.trace("Speculating read retry on {}", endpoint);
 +                MessagingService.instance().sendRR(scommand.createMessage(), 
endpoint, handler);
 +                cfs.metric.speculativeRetry.inc();
 +            }
 +        }
 +    }
 +
 +    private static class SpeculateAlwaysExecutor extends AbstractReadExecutor
 +    {
 +        public SpeculateAlwaysExecutor(ColumnFamilyStore cfs, ReadCommand 
command, ConsistencyLevel consistency_level, List<InetAddress> allReplicas, 
List<InetAddress> queryTargets) throws UnavailableException
 +        {
 +            super(cfs, command, consistency_level, allReplicas, queryTargets);
 +        }
 +
 +        @Override
 +        void executeAsync()
 +        {
 +            int limit = unfiltered.size() >= 2 ? 2 : 1;
 +            for (int i = 0; i < limit; i++)
 +            {
 +                InetAddress endpoint = unfiltered.get(i);
 +                if (endpoint.equals(FBUtilities.getBroadcastAddress()))
 +                {
 +                    logger.trace("reading full data locally");
 +                    StageManager.getStage(Stage.READ).execute(new 
LocalReadRunnable(command, handler));
 +                }
 +                else
 +                {
 +                    logger.trace("reading full data from {}", endpoint);
 +                    
MessagingService.instance().sendRR(command.createMessage(), endpoint, handler);
 +                }
 +            }
 +            if (handler.endpoints.size() <= limit)
 +                return;
 +
 +            ReadCommand digestCommand = command.copy();
 +            digestCommand.setDigestQuery(true);
 +            MessageOut<?> message = digestCommand.createMessage();
 +            for (int i = limit; i < handler.endpoints.size(); i++)
 +            {
 +                // Send the message
 +                InetAddress endpoint = handler.endpoints.get(i);
 +                if (endpoint.equals(FBUtilities.getBroadcastAddress()))
 +                {
 +                    logger.trace("reading data locally, isDigest: {}", 
command.isDigestQuery());
 +                    StageManager.getStage(Stage.READ).execute(new 
LocalReadRunnable(digestCommand, handler));
 +                }
 +                else
 +                {
 +                    logger.trace("reading full data from {}, isDigest: {}", 
endpoint, command.isDigestQuery());
 +                    MessagingService.instance().sendRR(message, endpoint, 
handler);
 +                }
 +            }
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/27392484/src/java/org/apache/cassandra/service/IResponseResolver.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/IResponseResolver.java
index 6e1c04a,17c8bff..0c54690
--- a/src/java/org/apache/cassandra/service/IResponseResolver.java
+++ b/src/java/org/apache/cassandra/service/IResponseResolver.java
@@@ -38,8 -36,8 +36,8 @@@ public interface IResponseResolver<TMes
      /**
       * returns the data response without comparing with any digests
       */
-     public TResolved getData() throws IOException;
+     public TResolved getData();
  
 -    public void preprocess(MessageIn<TMessage> message);
 +    public boolean preprocess(MessageIn<TMessage> message);
      public Iterable<MessageIn<TMessage>> getMessages();
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/27392484/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/27392484/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/ReadCallback.java
index a1caff6,a19df5f..560e577
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@@ -95,18 -94,10 +93,18 @@@ public class ReadCallback<TMessage, TRe
          {
              throw new AssertionError(ex);
          }
 +    }
  
-     public TResolved get() throws ReadTimeoutException, 
DigestMismatchException, IOException
 -        if (!success)
 -            throw new ReadTimeoutException(consistencyLevel, received.get(), 
blockfor, resolver.isDataPresent());
 -
++    public TResolved get() throws ReadTimeoutException, 
DigestMismatchException
 +    {
 +        long timeout = command.getTimeout() - (System.currentTimeMillis() - 
startTime);
 +        if (!await(timeout))
 +        {
 +            ReadTimeoutException ex = new 
ReadTimeoutException(consistencyLevel, received.get(), blockfor, 
resolver.isDataPresent());
 +            if (logger.isDebugEnabled())
 +                logger.debug("Read timeout: {}", ex.toString());
 +            throw ex;
 +        }
          return blockfor == 1 ? resolver.getData() : resolver.resolve();
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/27392484/src/java/org/apache/cassandra/service/RowDataResolver.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/27392484/src/java/org/apache/cassandra/service/RowDigestResolver.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/27392484/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/27392484/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------

Reply via email to