Merge branch 'cassandra-1.2' into trunk
Conflicts:
src/java/org/apache/cassandra/cql/QueryProcessor.java
src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java
src/java/org/apache/cassandra/service/ReadCallback.java
src/java/org/apache/cassandra/service/RowDataResolver.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/27392484
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/27392484
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/27392484
Branch: refs/heads/trunk
Commit: 273924847b255f7c358defa41fa5906d55a22025
Parents: e306a87 6a03b11
Author: Aleksey Yeschenko <[email protected]>
Authored: Thu Apr 4 02:10:43 2013 +0300
Committer: Aleksey Yeschenko <[email protected]>
Committed: Thu Apr 4 02:13:26 2013 +0300
----------------------------------------------------------------------
.../org/apache/cassandra/cql/QueryProcessor.java | 31 +++--------
.../cql3/statements/ModificationStatement.java | 45 ++++++---------
.../cassandra/cql3/statements/SelectStatement.java | 25 ++-------
.../org/apache/cassandra/db/CounterMutation.java | 2 +-
src/java/org/apache/cassandra/db/ReadCommand.java | 2 +-
.../org/apache/cassandra/db/ReadVerbHandler.java | 25 +++------
.../cassandra/db/RetriedSliceFromReadCommand.java | 1 -
.../cassandra/db/SliceByNamesReadCommand.java | 2 +-
.../cassandra/service/AbstractReadExecutor.java | 2 +-
.../cassandra/service/IResponseResolver.java | 6 +-
.../service/RangeSliceResponseResolver.java | 5 +-
.../org/apache/cassandra/service/ReadCallback.java | 8 +--
.../apache/cassandra/service/RowDataResolver.java | 5 +-
.../cassandra/service/RowDigestResolver.java | 3 +-
.../org/apache/cassandra/service/StorageProxy.java | 10 ++--
.../apache/cassandra/thrift/CassandraServer.java | 16 -----
16 files changed, 58 insertions(+), 130 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/27392484/src/java/org/apache/cassandra/cql/QueryProcessor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql/QueryProcessor.java
index e2fba6f,5977301..b365644
--- a/src/java/org/apache/cassandra/cql/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql/QueryProcessor.java
@@@ -183,20 -179,14 +175,13 @@@ public class QueryProcesso
? select.getNumRecords() + 1
: select.getNumRecords();
- try
- {
- rows = StorageProxy.getRangeSlice(new
RangeSliceCommand(metadata.ksName,
-
select.getColumnFamily(),
-
columnFilter,
- bounds,
-
expressions,
- limit),
-
select.getConsistencyLevel());
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
+ List<org.apache.cassandra.db.Row> rows =
StorageProxy.getRangeSlice(new RangeSliceCommand(metadata.ksName,
+
select.getColumnFamily(),
-
null,
+
columnFilter,
+
bounds,
+
expressions,
+
limit),
+
select.getConsistencyLevel());
// if start key was set and relation was "greater than"
if (select.getKeyStart() != null && !select.includeStartKey() &&
!rows.isEmpty())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/27392484/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --cc
src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 6118937,28a003e..87843d2
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@@ -164,36 -163,29 +162,29 @@@ public abstract class ModificationState
for (ByteBuffer key : keys)
commands.add(new SliceFromReadCommand(keyspace(),
key,
- new
QueryPath(columnFamily()),
+ columnFamily(),
new
SliceQueryFilter(slices, false, Integer.MAX_VALUE)));
- try
- {
- List<Row> rows = local
- ? SelectStatement.readLocally(keyspace(), commands)
- : StorageProxy.read(commands, cl);
-
- Map<ByteBuffer, ColumnGroupMap> map = new HashMap<ByteBuffer,
ColumnGroupMap>();
- for (Row row : rows)
- {
- if (row.cf == null || row.cf.isEmpty())
- continue;
-
- ColumnGroupMap.Builder groupBuilder = new
ColumnGroupMap.Builder(composite, true);
- for (Column column : row.cf)
- groupBuilder.add(column);
-
- List<ColumnGroupMap> groups = groupBuilder.groups();
- assert groups.isEmpty() || groups.size() == 1;
- if (!groups.isEmpty())
- map.put(row.key.key, groups.get(0));
- }
- return map;
- }
- catch (IOException e)
+ List<Row> rows = local
+ ? SelectStatement.readLocally(keyspace(), commands)
+ : StorageProxy.read(commands, cl);
+
+ Map<ByteBuffer, ColumnGroupMap> map = new HashMap<ByteBuffer,
ColumnGroupMap>();
+ for (Row row : rows)
{
- throw new IOError(e);
+ if (row.cf == null || row.cf.isEmpty())
+ continue;
+
+ ColumnGroupMap.Builder groupBuilder = new
ColumnGroupMap.Builder(composite, true);
- for (IColumn column : row.cf)
++ for (Column column : row.cf)
+ groupBuilder.add(column);
+
+ List<ColumnGroupMap> groups = groupBuilder.groups();
+ assert groups.isEmpty() || groups.size() == 1;
+ if (!groups.isEmpty())
+ map.put(row.key.key, groups.get(0));
}
+ return map;
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/27392484/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/27392484/src/java/org/apache/cassandra/db/CounterMutation.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/27392484/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/27392484/src/java/org/apache/cassandra/db/ReadVerbHandler.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/27392484/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/27392484/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
index ffcab0a,bb3b7b9..82ba720
--- a/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
@@@ -50,10 -58,10 +50,10 @@@ public class SliceByNamesReadCommand ex
return readCommand;
}
- public Row getRow(Table table) throws IOException
+ public Row getRow(Table table)
{
DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
- return table.getRow(new QueryFilter(dk, queryPath, filter));
+ return table.getRow(new QueryFilter(dk, cfName, filter));
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/27392484/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/AbstractReadExecutor.java
index 7a15bda,0000000..9c17678
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
@@@ -1,245 -1,0 +1,245 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.ReadResponse;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.exceptions.UnavailableException;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageProxy.LocalReadRunnable;
+import org.apache.cassandra.utils.FBUtilities;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.primitives.Longs;
+
+public abstract class AbstractReadExecutor
+{
+ private static final Logger logger =
LoggerFactory.getLogger(AbstractReadExecutor.class);
+ protected final ReadCallback<ReadResponse, Row> handler;
+ protected final ReadCommand command;
+ protected final RowDigestResolver resolver;
+ protected final List<InetAddress> unfiltered;
+ protected final List<InetAddress> endpoints;
+ protected final ColumnFamilyStore cfs;
+
+ AbstractReadExecutor(ColumnFamilyStore cfs,
+ ReadCommand command,
+ ConsistencyLevel consistency_level,
+ List<InetAddress> allReplicas,
+ List<InetAddress> queryTargets)
+ throws UnavailableException
+ {
+ unfiltered = allReplicas;
+ this.endpoints = queryTargets;
+ this.resolver = new RowDigestResolver(command.table, command.key);
+ this.handler = new ReadCallback<ReadResponse, Row>(resolver,
consistency_level, command, this.endpoints);
+ this.command = command;
+ this.cfs = cfs;
+
+ handler.assureSufficientLiveNodes();
+ assert !handler.endpoints.isEmpty();
+ }
+
+ void executeAsync()
+ {
+ // The data-request message is sent to dataPoint, the node that will
actually get the data for us
+ InetAddress dataPoint = handler.endpoints.get(0);
+ if (dataPoint.equals(FBUtilities.getBroadcastAddress()) &&
StorageProxy.OPTIMIZE_LOCAL_REQUESTS)
+ {
+ logger.trace("reading data locally");
+ StageManager.getStage(Stage.READ).execute(new
LocalReadRunnable(command, handler));
+ }
+ else
+ {
+ logger.trace("reading data from {}", dataPoint);
+ MessagingService.instance().sendRR(command.createMessage(),
dataPoint, handler);
+ }
+
+ if (handler.endpoints.size() == 1)
+ return;
+
+ // send the other endpoints a digest request
+ ReadCommand digestCommand = command.copy();
+ digestCommand.setDigestQuery(true);
+ MessageOut<?> message = null;
+ for (int i = 1; i < handler.endpoints.size(); i++)
+ {
+ InetAddress digestPoint = handler.endpoints.get(i);
+ if (digestPoint.equals(FBUtilities.getBroadcastAddress()))
+ {
+ logger.trace("reading digest locally");
+ StageManager.getStage(Stage.READ).execute(new
LocalReadRunnable(digestCommand, handler));
+ }
+ else
+ {
+ logger.trace("reading digest from {}", digestPoint);
+ // (We lazy-construct the digest Message object since it may
not be necessary if we
+ // are doing a local digest read, or no digest reads at all.)
+ if (message == null)
+ message = digestCommand.createMessage();
+ MessagingService.instance().sendRR(message, digestPoint,
handler);
+ }
+ }
+ }
+
+ void speculate()
+ {
+ // noop by default.
+ }
+
- Row get() throws ReadTimeoutException, DigestMismatchException,
IOException
++ Row get() throws ReadTimeoutException, DigestMismatchException
+ {
+ return handler.get();
+ }
+
+ public static AbstractReadExecutor getReadExecutor(ReadCommand command,
ConsistencyLevel consistency_level) throws UnavailableException
+ {
+ Table table = Table.open(command.table);
+ List<InetAddress> allReplicas =
StorageProxy.getLiveSortedEndpoints(table, command.key);
+ CFMetaData metaData = Schema.instance.getCFMetaData(command.table,
command.cfName);
+ List<InetAddress> queryTargets =
consistency_level.filterForQuery(table, allReplicas,
metaData.newReadRepairDecision());
+
+ if (StorageService.instance.isClientMode())
+ {
+ return new DefaultReadExecutor(null, command, consistency_level,
allReplicas, queryTargets);
+ }
+
+ ColumnFamilyStore cfs = table.getColumnFamilyStore(command.cfName);
+
+ switch (metaData.getSpeculativeRetry().type)
+ {
+ case ALWAYS:
+ return new SpeculateAlwaysExecutor(cfs, command,
consistency_level, allReplicas, queryTargets);
+ case PERCENTILE:
+ case CUSTOM:
+ return queryTargets.size() < allReplicas.size()
+ ? new SpeculativeReadExecutor(cfs, command,
consistency_level, allReplicas, queryTargets)
+ : new DefaultReadExecutor(cfs, command,
consistency_level, allReplicas, queryTargets);
+ default:
+ return new DefaultReadExecutor(cfs, command,
consistency_level, allReplicas, queryTargets);
+ }
+ }
+
+ private static class DefaultReadExecutor extends AbstractReadExecutor
+ {
+ public DefaultReadExecutor(ColumnFamilyStore cfs, ReadCommand
command, ConsistencyLevel consistency_level, List<InetAddress> allReplicas,
List<InetAddress> queryTargets) throws UnavailableException
+ {
+ super(cfs, command, consistency_level, allReplicas, queryTargets);
+ }
+ }
+
+ private static class SpeculativeReadExecutor extends AbstractReadExecutor
+ {
+ public SpeculativeReadExecutor(ColumnFamilyStore cfs, ReadCommand
command, ConsistencyLevel consistency_level, List<InetAddress> allReplicas,
List<InetAddress> queryTargets) throws UnavailableException
+ {
+ super(cfs, command, consistency_level, allReplicas, queryTargets);
+ assert handler.endpoints.size() < unfiltered.size();
+ }
+
+ @Override
+ void speculate()
+ {
+ // no latency information, or we're overloaded
+ if (cfs.sampleLatency > command.getTimeout())
+ return;
+
+ if (!handler.await(cfs.sampleLatency))
+ {
+ InetAddress endpoint =
unfiltered.get(handler.endpoints.size());
+
+ // could be waiting on the data, or on enough digests
+ ReadCommand scommand = command;
+ if (resolver.getData() != null)
+ {
+ scommand = command.copy();
+ scommand.setDigestQuery(true);
+ }
+
+ logger.trace("Speculating read retry on {}", endpoint);
+ MessagingService.instance().sendRR(scommand.createMessage(),
endpoint, handler);
+ cfs.metric.speculativeRetry.inc();
+ }
+ }
+ }
+
+ private static class SpeculateAlwaysExecutor extends AbstractReadExecutor
+ {
+ public SpeculateAlwaysExecutor(ColumnFamilyStore cfs, ReadCommand
command, ConsistencyLevel consistency_level, List<InetAddress> allReplicas,
List<InetAddress> queryTargets) throws UnavailableException
+ {
+ super(cfs, command, consistency_level, allReplicas, queryTargets);
+ }
+
+ @Override
+ void executeAsync()
+ {
+ int limit = unfiltered.size() >= 2 ? 2 : 1;
+ for (int i = 0; i < limit; i++)
+ {
+ InetAddress endpoint = unfiltered.get(i);
+ if (endpoint.equals(FBUtilities.getBroadcastAddress()))
+ {
+ logger.trace("reading full data locally");
+ StageManager.getStage(Stage.READ).execute(new
LocalReadRunnable(command, handler));
+ }
+ else
+ {
+ logger.trace("reading full data from {}", endpoint);
+
MessagingService.instance().sendRR(command.createMessage(), endpoint, handler);
+ }
+ }
+ if (handler.endpoints.size() <= limit)
+ return;
+
+ ReadCommand digestCommand = command.copy();
+ digestCommand.setDigestQuery(true);
+ MessageOut<?> message = digestCommand.createMessage();
+ for (int i = limit; i < handler.endpoints.size(); i++)
+ {
+ // Send the message
+ InetAddress endpoint = handler.endpoints.get(i);
+ if (endpoint.equals(FBUtilities.getBroadcastAddress()))
+ {
+ logger.trace("reading data locally, isDigest: {}",
command.isDigestQuery());
+ StageManager.getStage(Stage.READ).execute(new
LocalReadRunnable(digestCommand, handler));
+ }
+ else
+ {
+ logger.trace("reading full data from {}, isDigest: {}",
endpoint, command.isDigestQuery());
+ MessagingService.instance().sendRR(message, endpoint,
handler);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/27392484/src/java/org/apache/cassandra/service/IResponseResolver.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/IResponseResolver.java
index 6e1c04a,17c8bff..0c54690
--- a/src/java/org/apache/cassandra/service/IResponseResolver.java
+++ b/src/java/org/apache/cassandra/service/IResponseResolver.java
@@@ -38,8 -36,8 +36,8 @@@ public interface IResponseResolver<TMes
/**
* returns the data response without comparing with any digests
*/
- public TResolved getData() throws IOException;
+ public TResolved getData();
- public void preprocess(MessageIn<TMessage> message);
+ public boolean preprocess(MessageIn<TMessage> message);
public Iterable<MessageIn<TMessage>> getMessages();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/27392484/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/27392484/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/ReadCallback.java
index a1caff6,a19df5f..560e577
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@@ -95,18 -94,10 +93,18 @@@ public class ReadCallback<TMessage, TRe
{
throw new AssertionError(ex);
}
+ }
- public TResolved get() throws ReadTimeoutException,
DigestMismatchException, IOException
- if (!success)
- throw new ReadTimeoutException(consistencyLevel, received.get(),
blockfor, resolver.isDataPresent());
-
++ public TResolved get() throws ReadTimeoutException,
DigestMismatchException
+ {
+ long timeout = command.getTimeout() - (System.currentTimeMillis() -
startTime);
+ if (!await(timeout))
+ {
+ ReadTimeoutException ex = new
ReadTimeoutException(consistencyLevel, received.get(), blockfor,
resolver.isDataPresent());
+ if (logger.isDebugEnabled())
+ logger.debug("Read timeout: {}", ex.toString());
+ throw ex;
+ }
return blockfor == 1 ? resolver.getData() : resolver.resolve();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/27392484/src/java/org/apache/cassandra/service/RowDataResolver.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/27392484/src/java/org/apache/cassandra/service/RowDigestResolver.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/27392484/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/27392484/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------