merge from 1.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5b0d43f6 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5b0d43f6 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5b0d43f6 Branch: refs/heads/trunk Commit: 5b0d43f6f2485f1439eb9d6a7ad112f6c9a515f3 Parents: 093e188 0c81eae Author: Jonathan Ellis <[email protected]> Authored: Tue Jun 18 08:44:17 2013 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Tue Jun 18 08:45:57 2013 -0500 ---------------------------------------------------------------------- .../apache/cassandra/cql3/QueryProcessor.java | 6 ++-- .../apache/cassandra/db/ReadVerbHandler.java | 2 -- .../apache/cassandra/service/StorageProxy.java | 36 +++----------------- 3 files changed, 6 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b0d43f6/src/java/org/apache/cassandra/cql3/QueryProcessor.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/cql3/QueryProcessor.java index ac5afbc,513c96e..1b89fe3 --- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java +++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java @@@ -109,9 -111,10 +109,10 @@@ public class QueryProcesso private static ResultMessage processStatement(CQLStatement statement, ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables) throws RequestExecutionException, RequestValidationException { + logger.trace("Process {} @CL.{}", statement, cl); ClientState clientState = queryState.getClientState(); - statement.validate(clientState); statement.checkAccess(clientState); + statement.validate(clientState); ResultMessage result = statement.execute(cl, queryState, variables); return result == null ? new ResultMessage.Void() : result; } @@@ -119,22 -122,10 +120,21 @@@ public static ResultMessage process(String queryString, ConsistencyLevel cl, QueryState queryState) throws RequestExecutionException, RequestValidationException { + return process(queryString, Collections.<ByteBuffer>emptyList(), cl, queryState); + } + + public static ResultMessage process(String queryString, List<ByteBuffer> variables, ConsistencyLevel cl, QueryState queryState) + throws RequestExecutionException, RequestValidationException + { - logger.trace("CQL QUERY: {}", queryString); CQLStatement prepared = getStatement(queryString, queryState.getClientState()).statement; - if (prepared.getBoundsTerms() > 0) - throw new InvalidRequestException("Cannot execute query with bind variables"); - return processStatement(prepared, cl, queryState, Collections.<ByteBuffer>emptyList()); + if (prepared.getBoundsTerms() != variables.size()) + throw new InvalidRequestException("Invalid amount of bind variables"); + return processStatement(prepared, cl, queryState, variables); + } + + public static CQLStatement parseStatement(String queryStr, QueryState queryState) throws RequestValidationException + { + return getStatement(queryStr, queryState.getClientState()).statement; } public static UntypedResultSet process(String query, ConsistencyLevel cl) throws RequestExecutionException http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b0d43f6/src/java/org/apache/cassandra/db/ReadVerbHandler.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b0d43f6/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/StorageProxy.java index 0203e4b,adb3f2d..9d095cc --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@@ -137,10 -136,8 +137,7 @@@ public class StorageProxy implements St AbstractWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) - throws IOException { - if (logger.isTraceEnabled()) - logger.trace("insert writing local & replicate " + mutation.toString(true)); - Runnable runnable = counterWriteTask(mutation, targets, responseHandler, localDataCenter, consistency_level); runnable.run(); } @@@ -153,10 -150,8 +150,7 @@@ AbstractWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) - throws IOException { - if (logger.isTraceEnabled()) - logger.trace("insert writing local & replicate " + mutation.toString(true)); - Runnable runnable = counterWriteTask(mutation, targets, responseHandler, localDataCenter, consistency_level); StageManager.getStage(Stage.MUTATION).execute(runnable); } @@@ -779,28 -505,14 +771,27 @@@ else { // belongs on a different server + if (message == null) + message = rm.createMessage(); - String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination); - Multimap<MessageOut, InetAddress> messages = dcMessages.get(dc); - if (messages == null) + // direct writes to local DC or old Cassandra versions + // (1.1 knows how to forward old-style String message IDs; updated to int in 2.0) + if (localDataCenter.equals(dc) || MessagingService.instance().getVersion(destination) < MessagingService.VERSION_20) + { + MessagingService.instance().sendRR(message, destination, responseHandler); + } + else { - messages = HashMultimap.create(); - dcMessages.put(dc, messages); + Collection<InetAddress> messages = (dcGroups != null) ? dcGroups.get(dc) : null; + if (messages == null) + { + messages = new ArrayList<InetAddress>(3); // most DCs will have <= 3 replicas + if (dcGroups == null) + dcGroups = new HashMap<String, Collection<InetAddress>>(); + dcGroups.put(dc, messages); + } + messages.add(destination); } - messages.put(rm.createMessage(), destination); } } else @@@ -908,12 -650,9 +899,9 @@@ private static void insertLocal(final RowMutation rm, final AbstractWriteResponseHandler responseHandler) { - if (logger.isTraceEnabled()) - logger.trace("insert writing local " + rm.toString(true)); - Runnable runnable = new DroppableRunnable(MessagingService.Verb.MUTATION) { - public void runMayThrow() throws IOException + public void runMayThrow() { rm.apply(); responseHandler.response(null); @@@ -1159,17 -867,62 +1146,16 @@@ for (int i = 0; i < commands.size(); i++) { ReadCommand command = commands.get(i); - Table table = Table.open(command.getKeyspace()); assert !command.isDigestQuery(); - logger.trace("Command/ConsistencyLevel is {}/{}", command, consistency_level); - List<InetAddress> endpoints = getLiveSortedEndpoints(table, command.key); - CFMetaData cfm = Schema.instance.getCFMetaData(command.getKeyspace(), command.getColumnFamilyName()); - - ReadRepairDecision rrDecision = cfm.newReadRepairDecision(); - endpoints = consistency_level.filterForQuery(table, endpoints, rrDecision); - - if (rrDecision != ReadRepairDecision.NONE) { - ReadRepairMetrics.attempted.mark(); - } - - RowDigestResolver resolver = new RowDigestResolver(command.table, command.key); - ReadCallback<ReadResponse, Row> handler = new ReadCallback(resolver, consistency_level, command, endpoints); - handler.assureSufficientLiveNodes(); - assert !endpoints.isEmpty(); - readCallbacks[i] = handler; - - // The data-request message is sent to dataPoint, the node that will actually get the data for us - InetAddress dataPoint = endpoints.get(0); - if (dataPoint.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS) - { - StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler)); - } - else - { - Tracing.trace("Enqueuing data request to {}", dataPoint); - MessagingService.instance().sendRR(command.createMessage(), dataPoint, handler); - } - - if (endpoints.size() == 1) - continue; - - // send the other endpoints a digest request - ReadCommand digestCommand = command.copy(); - digestCommand.setDigestQuery(true); - MessageOut message = null; - for (InetAddress digestPoint : endpoints.subList(1, endpoints.size())) - { - if (digestPoint.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS) - { - StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand, handler)); - } - else - { - Tracing.trace("Enqueuing digest request to {}", dataPoint); - // (We lazy-construct the digest Message object since it may not be necessary if we - // are doing a local digest read, or no digest reads at all.) - if (message == null) - message = digestCommand.createMessage(); - MessagingService.instance().sendRR(message, digestPoint, handler); - } - } + AbstractReadExecutor exec = AbstractReadExecutor.getReadExecutor(command, consistency_level); + exec.executeAsync(); + readExecutors[i] = exec; } + for (AbstractReadExecutor exec: readExecutors) + exec.speculate(); + // read results and make a second pass for any digest mismatches List<ReadCommand> repairCommands = null; List<ReadCallback<ReadResponse, Row>> repairResponseHandlers = null; @@@ -1305,12 -1064,10 +1289,10 @@@ this.handler = handler; } - protected void runMayThrow() throws ExecutionException, InterruptedException + protected void runMayThrow() { - logger.trace("LocalReadRunnable reading {}", command); - RangeSliceReply result = new RangeSliceReply(RangeSliceVerbHandler.executeLocally(command)); - MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), System.currentTimeMillis() - start); + MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); handler.response(result); } }
