http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 032765a..17a9c11 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -41,8 +41,9 @@ import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.*; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.filter.TombstoneOverwhelmingException; -import org.apache.cassandra.db.index.SecondaryIndex; import org.apache.cassandra.db.index.SecondaryIndexSearcher; import org.apache.cassandra.db.marshal.UUIDType; import org.apache.cassandra.dht.AbstractBounds; @@ -195,13 +196,13 @@ public class StorageProxy implements StorageProxyMBean * @return null if the operation succeeds in updating the row, or the current values corresponding to conditions. * (since, if the CAS doesn't succeed, it means the current value do not match the conditions). */ - public static ColumnFamily cas(String keyspaceName, - String cfName, - ByteBuffer key, - CASRequest request, - ConsistencyLevel consistencyForPaxos, - ConsistencyLevel consistencyForCommit, - ClientState state) + public static RowIterator cas(String keyspaceName, + String cfName, + DecoratedKey key, + CASRequest request, + ConsistencyLevel consistencyForPaxos, + ConsistencyLevel consistencyForCommit, + ClientState state) throws UnavailableException, IsBootstrappingException, RequestFailureException, RequestTimeoutException, InvalidRequestException { final long start = System.nanoTime(); @@ -217,32 +218,35 @@ public class StorageProxy implements StorageProxyMBean while (System.nanoTime() - start < timeout) { // for simplicity, we'll do a single liveness check at the start of each attempt - Pair<List<InetAddress>, Integer> p = getPaxosParticipants(keyspaceName, key, consistencyForPaxos); + Pair<List<InetAddress>, Integer> p = getPaxosParticipants(metadata, key, consistencyForPaxos); List<InetAddress> liveEndpoints = p.left; int requiredParticipants = p.right; final Pair<UUID, Integer> pair = beginAndRepairPaxos(start, key, metadata, liveEndpoints, requiredParticipants, consistencyForPaxos, consistencyForCommit, true, state); final UUID ballot = pair.left; contentions += pair.right; + // read the current values and check they validate the conditions Tracing.trace("Reading existing values for CAS precondition"); - long timestamp = System.currentTimeMillis(); - ReadCommand readCommand = ReadCommand.create(keyspaceName, key, cfName, timestamp, request.readFilter()); - List<Row> rows = read(Arrays.asList(readCommand), consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL - ? ConsistencyLevel.LOCAL_QUORUM - : ConsistencyLevel.QUORUM); - ColumnFamily current = rows.get(0).cf; + SinglePartitionReadCommand readCommand = request.readCommand(FBUtilities.nowInSeconds()); + ConsistencyLevel readConsistency = consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL ? ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM; + + FilteredPartition current; + try (RowIterator rowIter = readOne(readCommand, readConsistency)) + { + current = FilteredPartition.create(rowIter); + } + if (!request.appliesTo(current)) { Tracing.trace("CAS precondition does not match current values {}", current); - // We should not return null as this means success casWriteMetrics.conditionNotMet.inc(); - return current == null ? ArrayBackedSortedColumns.factory.create(metadata) : current; + return current.rowIterator(); } // finish the paxos round w/ the desired updates // TODO turn null updates into delete? - ColumnFamily updates = request.makeUpdates(current); + PartitionUpdate updates = request.makeUpdates(current); // Apply triggers to cas updates. A consideration here is that // triggers emit Mutations, and so a given trigger implementation @@ -251,9 +255,10 @@ public class StorageProxy implements StorageProxyMBean // validate that the generated mutations are targetted at the same // partition as the initial updates and reject (via an // InvalidRequestException) any which aren't. - updates = TriggerExecutor.instance.execute(key, updates); + updates = TriggerExecutor.instance.execute(updates); + - Commit proposal = Commit.newProposal(key, ballot, updates); + Commit proposal = Commit.newProposal(ballot, updates); Tracing.trace("CAS precondition is met; proposing client-requested updates for {}", ballot); if (proposePaxos(proposal, liveEndpoints, requiredParticipants, true, consistencyForPaxos)) { @@ -305,12 +310,11 @@ public class StorageProxy implements StorageProxyMBean }; } - private static Pair<List<InetAddress>, Integer> getPaxosParticipants(String keyspaceName, ByteBuffer key, ConsistencyLevel consistencyForPaxos) throws UnavailableException + private static Pair<List<InetAddress>, Integer> getPaxosParticipants(CFMetaData cfm, DecoratedKey key, ConsistencyLevel consistencyForPaxos) throws UnavailableException { - Token tk = StorageService.getPartitioner().getToken(key); - List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk); - Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName); - + Token tk = key.getToken(); + List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(cfm.ksName, tk); + Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, cfm.ksName); if (consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL) { // Restrict naturalEndpoints and pendingEndpoints to node in the local DC only @@ -344,7 +348,7 @@ public class StorageProxy implements StorageProxyMBean * nodes have seen the mostRecentCommit. Otherwise, return null. */ private static Pair<UUID, Integer> beginAndRepairPaxos(long start, - ByteBuffer key, + DecoratedKey key, CFMetaData metadata, List<InetAddress> liveEndpoints, int requiredParticipants, @@ -393,7 +397,7 @@ public class StorageProxy implements StorageProxyMBean casWriteMetrics.unfinishedCommit.inc(); else casReadMetrics.unfinishedCommit.inc(); - Commit refreshedInProgress = Commit.newProposal(inProgress.key, ballot, inProgress.update); + Commit refreshedInProgress = Commit.newProposal(ballot, inProgress.update); if (proposePaxos(refreshedInProgress, liveEndpoints, requiredParticipants, false, consistencyForPaxos)) { try @@ -451,7 +455,7 @@ public class StorageProxy implements StorageProxyMBean private static PrepareCallback preparePaxos(Commit toPrepare, List<InetAddress> endpoints, int requiredParticipants, ConsistencyLevel consistencyForPaxos) throws WriteTimeoutException { - PrepareCallback callback = new PrepareCallback(toPrepare.key, toPrepare.update.metadata(), requiredParticipants, consistencyForPaxos); + PrepareCallback callback = new PrepareCallback(toPrepare.update.partitionKey(), toPrepare.update.metadata(), requiredParticipants, consistencyForPaxos); MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_PREPARE, toPrepare, Commit.serializer); for (InetAddress target : endpoints) MessagingService.instance().sendRR(message, target, callback); @@ -483,7 +487,7 @@ public class StorageProxy implements StorageProxyMBean boolean shouldBlock = consistencyLevel != ConsistencyLevel.ANY; Keyspace keyspace = Keyspace.open(proposal.update.metadata().ksName); - Token tk = StorageService.getPartitioner().getToken(proposal.key); + Token tk = proposal.update.partitionKey().getToken(); List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspace.getName(), tk); Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspace.getName()); @@ -604,7 +608,7 @@ public class StorageProxy implements StorageProxyMBean if (mutation instanceof CounterMutation) continue; - Token tk = StorageService.getPartitioner().getToken(mutation.key()); + Token tk = mutation.key().getToken(); List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(mutation.getKeyspaceName(), tk); Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, mutation.getKeyspaceName()); for (InetAddress target : Iterables.concat(naturalEndpoints, pendingEndpoints)) @@ -699,6 +703,12 @@ public class StorageProxy implements StorageProxyMBean } } + public static boolean canDoLocalRequest(InetAddress replica) + { + return replica.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS; + } + + private static void syncWriteToBatchlog(Collection<Mutation> mutations, Collection<InetAddress> endpoints, UUID uuid) throws WriteTimeoutException, WriteFailureException { @@ -714,7 +724,7 @@ public class StorageProxy implements StorageProxyMBean for (InetAddress target : endpoints) { int targetVersion = MessagingService.instance().getVersion(target); - if (target.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS) + if (canDoLocalRequest(target)) { insertLocal(message.payload, handler); } @@ -743,12 +753,12 @@ public class StorageProxy implements StorageProxyMBean Keyspace.open(SystemKeyspace.NAME), null, WriteType.SIMPLE); - Mutation mutation = new Mutation(SystemKeyspace.NAME, UUIDType.instance.decompose(uuid)); - mutation.delete(SystemKeyspace.BATCHLOG, FBUtilities.timestampMicros()); + Mutation mutation = new Mutation(SystemKeyspace.NAME, StorageService.getPartitioner().decorateKey(UUIDType.instance.decompose(uuid))); + mutation.add(PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batchlog, mutation.key(), FBUtilities.timestampMicros(), FBUtilities.nowInSeconds())); MessageOut<Mutation> message = mutation.createMessage(); for (InetAddress target : endpoints) { - if (target.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS) + if (canDoLocalRequest(target)) insertLocal(message.payload, handler); else MessagingService.instance().sendRR(message, target, handler, false); @@ -793,7 +803,7 @@ public class StorageProxy implements StorageProxyMBean String keyspaceName = mutation.getKeyspaceName(); AbstractReplicationStrategy rs = Keyspace.open(keyspaceName).getReplicationStrategy(); - Token tk = StorageService.getPartitioner().getToken(mutation.key()); + Token tk = mutation.key().getToken(); List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk); Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName); @@ -811,7 +821,7 @@ public class StorageProxy implements StorageProxyMBean { AbstractReplicationStrategy rs = Keyspace.open(mutation.getKeyspaceName()).getReplicationStrategy(); String keyspaceName = mutation.getKeyspaceName(); - Token tk = StorageService.getPartitioner().getToken(mutation.key()); + Token tk = mutation.key().getToken(); List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk); Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName); AbstractWriteResponseHandler<IMutation> responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, null, writeType); @@ -903,7 +913,7 @@ public class StorageProxy implements StorageProxyMBean if (FailureDetector.instance.isAlive(destination)) { - if (destination.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS) + if (canDoLocalRequest(destination)) { insertLocal = true; } else @@ -1064,7 +1074,7 @@ public class StorageProxy implements StorageProxyMBean } catch (Exception ex) { - logger.error("Failed to apply mutation locally : {}", ex.getMessage()); + logger.error("Failed to apply mutation locally : {}", ex); responseHandler.onFailure(FBUtilities.getBroadcastAddress()); } } @@ -1098,7 +1108,7 @@ public class StorageProxy implements StorageProxyMBean // Exit now if we can't fulfill the CL here instead of forwarding to the leader replica String keyspaceName = cm.getKeyspaceName(); AbstractReplicationStrategy rs = Keyspace.open(keyspaceName).getReplicationStrategy(); - Token tk = StorageService.getPartitioner().getToken(cm.key()); + Token tk = cm.key().getToken(); List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk); Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName); @@ -1123,7 +1133,7 @@ public class StorageProxy implements StorageProxyMBean * is unclear we want to mix those latencies with read latencies, so this * may be a bit involved. */ - private static InetAddress findSuitableEndpoint(String keyspaceName, ByteBuffer key, String localDataCenter, ConsistencyLevel cl) throws UnavailableException + private static InetAddress findSuitableEndpoint(String keyspaceName, DecoratedKey key, String localDataCenter, ConsistencyLevel cl) throws UnavailableException { Keyspace keyspace = Keyspace.open(keyspaceName); IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); @@ -1189,57 +1199,69 @@ public class StorageProxy implements StorageProxyMBean }; } - private static boolean systemKeyspaceQuery(List<ReadCommand> cmds) + private static boolean systemKeyspaceQuery(List<? extends ReadCommand> cmds) { for (ReadCommand cmd : cmds) - if (!cmd.ksName.equals(SystemKeyspace.NAME)) + if (!cmd.metadata().ksName.equals(SystemKeyspace.NAME)) return false; return true; } - public static List<Row> read(List<ReadCommand> commands, ConsistencyLevel consistencyLevel) + public static RowIterator readOne(SinglePartitionReadCommand command, ConsistencyLevel consistencyLevel) + throws UnavailableException, IsBootstrappingException, ReadFailureException, ReadTimeoutException, InvalidRequestException + { + return readOne(command, consistencyLevel, null); + } + + public static RowIterator readOne(SinglePartitionReadCommand command, ConsistencyLevel consistencyLevel, ClientState state) + throws UnavailableException, IsBootstrappingException, ReadFailureException, ReadTimeoutException, InvalidRequestException + { + return PartitionIterators.getOnlyElement(read(SinglePartitionReadCommand.Group.one(command), consistencyLevel, state), command); + } + + public static PartitionIterator read(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel) throws UnavailableException, IsBootstrappingException, ReadFailureException, ReadTimeoutException, InvalidRequestException { // When using serial CL, the ClientState should be provided assert !consistencyLevel.isSerialConsistency(); - return read(commands, consistencyLevel, null); + return read(group, consistencyLevel, null); } /** * Performs the actual reading of a row out of the StorageService, fetching * a specific set of column names from a given column family. */ - public static List<Row> read(List<ReadCommand> commands, ConsistencyLevel consistencyLevel, ClientState state) + public static PartitionIterator read(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, ClientState state) throws UnavailableException, IsBootstrappingException, ReadFailureException, ReadTimeoutException, InvalidRequestException { - if (StorageService.instance.isBootstrapMode() && !systemKeyspaceQuery(commands)) + if (StorageService.instance.isBootstrapMode() && !systemKeyspaceQuery(group.commands)) { readMetrics.unavailables.mark(); throw new IsBootstrappingException(); } return consistencyLevel.isSerialConsistency() - ? readWithPaxos(commands, consistencyLevel, state) - : readRegular(commands, consistencyLevel); + ? readWithPaxos(group, consistencyLevel, state) + : readRegular(group, consistencyLevel); } - private static List<Row> readWithPaxos(List<ReadCommand> commands, ConsistencyLevel consistencyLevel, ClientState state) + private static PartitionIterator readWithPaxos(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, ClientState state) throws InvalidRequestException, UnavailableException, ReadFailureException, ReadTimeoutException { assert state != null; + if (group.commands.size() > 1) + throw new InvalidRequestException("SERIAL/LOCAL_SERIAL consistency may only be requested for one partition at a time"); long start = System.nanoTime(); - List<Row> rows = null; + SinglePartitionReadCommand command = group.commands.get(0); + CFMetaData metadata = command.metadata(); + DecoratedKey key = command.partitionKey(); + PartitionIterator result = null; try { // make sure any in-progress paxos writes are done (i.e., committed to a majority of replicas), before performing a quorum read - if (commands.size() > 1) - throw new InvalidRequestException("SERIAL/LOCAL_SERIAL consistency may only be requested for one row at a time"); - ReadCommand command = commands.get(0); - - CFMetaData metadata = Schema.instance.getCFMetaData(command.ksName, command.cfName); - Pair<List<InetAddress>, Integer> p = getPaxosParticipants(command.ksName, command.key, consistencyLevel); + Pair<List<InetAddress>, Integer> p = getPaxosParticipants(metadata, key, consistencyLevel); List<InetAddress> liveEndpoints = p.left; int requiredParticipants = p.right; @@ -1247,22 +1269,23 @@ public class StorageProxy implements StorageProxyMBean final ConsistencyLevel consistencyForCommitOrFetch = consistencyLevel == ConsistencyLevel.LOCAL_SERIAL ? ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM; + try { - final Pair<UUID, Integer> pair = beginAndRepairPaxos(start, command.key, metadata, liveEndpoints, requiredParticipants, consistencyLevel, consistencyForCommitOrFetch, false, state); + final Pair<UUID, Integer> pair = beginAndRepairPaxos(start, key, metadata, liveEndpoints, requiredParticipants, consistencyLevel, consistencyForCommitOrFetch, false, state); if (pair.right > 0) casReadMetrics.contention.update(pair.right); } catch (WriteTimeoutException e) { - throw new ReadTimeoutException(consistencyLevel, 0, consistencyLevel.blockFor(Keyspace.open(command.ksName)), false); + throw new ReadTimeoutException(consistencyLevel, 0, consistencyLevel.blockFor(Keyspace.open(metadata.ksName)), false); } catch (WriteFailureException e) { throw new ReadFailureException(consistencyLevel, e.received, e.failures, e.blockFor, false); } - rows = fetchRows(commands, consistencyForCommitOrFetch); + result = fetchRows(group.commands, consistencyForCommitOrFetch); } catch (UnavailableException e) { @@ -1287,23 +1310,25 @@ public class StorageProxy implements StorageProxyMBean long latency = System.nanoTime() - start; readMetrics.addNano(latency); casReadMetrics.addNano(latency); - // TODO avoid giving every command the same latency number. Can fix this in CASSADRA-5329 - for (ReadCommand command : commands) - Keyspace.open(command.ksName).getColumnFamilyStore(command.cfName).metric.coordinatorReadLatency.update(latency, TimeUnit.NANOSECONDS); + Keyspace.open(metadata.ksName).getColumnFamilyStore(metadata.cfName).metric.coordinatorReadLatency.update(latency, TimeUnit.NANOSECONDS); } - return rows; + return result; } - private static List<Row> readRegular(List<ReadCommand> commands, ConsistencyLevel consistencyLevel) + @SuppressWarnings("resource") + private static PartitionIterator readRegular(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel) throws UnavailableException, ReadFailureException, ReadTimeoutException { long start = System.nanoTime(); - List<Row> rows = null; - try { - rows = fetchRows(commands, consistencyLevel); + PartitionIterator result = fetchRows(group.commands, consistencyLevel); + // If we have more than one command, then despite each read command honoring the limit, the total result + // might not honor it and so we should enforce it + if (group.commands.size() > 1) + result = group.limits().filter(result, group.nowInSec()); + return result; } catch (UnavailableException e) { @@ -1325,11 +1350,9 @@ public class StorageProxy implements StorageProxyMBean long latency = System.nanoTime() - start; readMetrics.addNano(latency); // TODO avoid giving every command the same latency number. Can fix this in CASSADRA-5329 - for (ReadCommand command : commands) - Keyspace.open(command.ksName).getColumnFamilyStore(command.cfName).metric.coordinatorReadLatency.update(latency, TimeUnit.NANOSECONDS); + for (ReadCommand command : group.commands) + Keyspace.openAndGetStore(command.metadata()).metric.coordinatorReadLatency.update(latency, TimeUnit.NANOSECONDS); } - - return rows; } /** @@ -1343,218 +1366,144 @@ public class StorageProxy implements StorageProxyMBean * 4. If the digests (if any) match the data return the data * 5. else carry out read repair by getting data from all the nodes. */ - private static List<Row> fetchRows(List<ReadCommand> initialCommands, ConsistencyLevel consistencyLevel) + private static PartitionIterator fetchRows(List<SinglePartitionReadCommand<?>> commands, ConsistencyLevel consistencyLevel) throws UnavailableException, ReadFailureException, ReadTimeoutException { - List<Row> rows = new ArrayList<>(initialCommands.size()); - // (avoid allocating a new list in the common case of nothing-to-retry) - List<ReadCommand> commandsToRetry = Collections.emptyList(); + int cmdCount = commands.size(); - do - { - List<ReadCommand> commands = commandsToRetry.isEmpty() ? initialCommands : commandsToRetry; - AbstractReadExecutor[] readExecutors = new AbstractReadExecutor[commands.size()]; + SinglePartitionReadLifecycle[] reads = new SinglePartitionReadLifecycle[cmdCount]; + for (int i = 0; i < cmdCount; i++) + reads[i] = new SinglePartitionReadLifecycle(commands.get(i), consistencyLevel); - if (!commandsToRetry.isEmpty()) - Tracing.trace("Retrying {} commands", commandsToRetry.size()); + for (int i = 0; i < cmdCount; i++) + reads[i].doInitialQueries(); - // send out read requests - for (int i = 0; i < commands.size(); i++) - { - ReadCommand command = commands.get(i); - assert !command.isDigestQuery(); + for (int i = 0; i < cmdCount; i++) + reads[i].maybeTryAdditionalReplicas(); - AbstractReadExecutor exec = AbstractReadExecutor.getReadExecutor(command, consistencyLevel); - exec.executeAsync(); - readExecutors[i] = exec; - } + for (int i = 0; i < cmdCount; i++) + reads[i].awaitResultsAndRetryOnDigestMismatch(); - for (AbstractReadExecutor exec : readExecutors) - exec.maybeTryAdditionalReplicas(); + for (int i = 0; i < cmdCount; i++) + if (!reads[i].isDone()) + reads[i].maybeAwaitFullDataRead(); - // read results and make a second pass for any digest mismatches - List<ReadCommand> repairCommands = null; - List<ReadCallback<ReadResponse, Row>> repairResponseHandlers = null; - for (AbstractReadExecutor exec: readExecutors) - { - try - { - Row row = exec.get(); - if (row != null) - { - exec.command.maybeTrim(row); - rows.add(row); - } + List<PartitionIterator> results = new ArrayList<>(cmdCount); + for (int i = 0; i < cmdCount; i++) + { + assert reads[i].isDone(); + results.add(reads[i].getResult()); + } - if (logger.isDebugEnabled()) - logger.debug("Read: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - exec.handler.start)); - } - catch (ReadTimeoutException|ReadFailureException ex) - { - int blockFor = consistencyLevel.blockFor(Keyspace.open(exec.command.getKeyspace())); - int responseCount = exec.handler.getReceivedCount(); - String gotData = responseCount > 0 - ? exec.resolver.isDataPresent() ? " (including data)" : " (only digests)" - : ""; - - boolean isTimeout = ex instanceof ReadTimeoutException; - if (Tracing.isTracing()) - { - Tracing.trace("{}; received {} of {} responses{}", - isTimeout ? "Timed out" : "Failed", responseCount, blockFor, gotData); - } - else if (logger.isDebugEnabled()) - { - logger.debug("Read {}; received {} of {} responses{}", (isTimeout ? "timeout" : "failure"), responseCount, blockFor, gotData); - } - throw ex; - } - catch (DigestMismatchException ex) - { - Tracing.trace("Digest mismatch: {}", ex); + return PartitionIterators.concat(results); + } - ReadRepairMetrics.repairedBlocking.mark(); + private static class SinglePartitionReadLifecycle + { + private final SinglePartitionReadCommand<?> command; + private final AbstractReadExecutor executor; + private final ConsistencyLevel consistency; - // Do a full data read to resolve the correct response (and repair node that need be) - RowDataResolver resolver = new RowDataResolver(exec.command.ksName, exec.command.key, exec.command.filter(), exec.command.timestamp, exec.handler.endpoints.size()); - ReadCallback<ReadResponse, Row> repairHandler = new ReadCallback<>(resolver, - ConsistencyLevel.ALL, - exec.getContactedReplicas().size(), - exec.command, - Keyspace.open(exec.command.getKeyspace()), - exec.handler.endpoints); + private PartitionIterator result; + private ReadCallback repairHandler; - if (repairCommands == null) - { - repairCommands = new ArrayList<>(); - repairResponseHandlers = new ArrayList<>(); - } - repairCommands.add(exec.command); - repairResponseHandlers.add(repairHandler); + SinglePartitionReadLifecycle(SinglePartitionReadCommand<?> command, ConsistencyLevel consistency) + { + this.command = command; + this.executor = AbstractReadExecutor.getReadExecutor(command, consistency); + this.consistency = consistency; + } - MessageOut<ReadCommand> message = exec.command.createMessage(); - for (InetAddress endpoint : exec.getContactedReplicas()) - { - Tracing.trace("Enqueuing full data read to {}", endpoint); - MessagingService.instance().sendRRWithFailure(message, endpoint, repairHandler); - } - } - } + boolean isDone() + { + return result != null; + } - commandsToRetry.clear(); + void doInitialQueries() + { + executor.executeAsync(); + } - // read the results for the digest mismatch retries - if (repairResponseHandlers != null) + void maybeTryAdditionalReplicas() + { + executor.maybeTryAdditionalReplicas(); + } + + void awaitResultsAndRetryOnDigestMismatch() throws ReadFailureException, ReadTimeoutException + { + try + { + result = executor.get(); + } + catch (DigestMismatchException ex) { - for (int i = 0; i < repairCommands.size(); i++) + Tracing.trace("Digest mismatch: {}", ex); + + ReadRepairMetrics.repairedBlocking.mark(); + + // Do a full data read to resolve the correct response (and repair node that need be) + Keyspace keyspace = Keyspace.open(command.metadata().ksName); + DataResolver resolver = new DataResolver(keyspace, command, ConsistencyLevel.ALL, executor.handler.endpoints.size()); + repairHandler = new ReadCallback(resolver, + ConsistencyLevel.ALL, + executor.getContactedReplicas().size(), + command, + keyspace, + executor.handler.endpoints); + + MessageOut<ReadCommand> message = command.createMessage(); + for (InetAddress endpoint : executor.getContactedReplicas()) { - ReadCommand command = repairCommands.get(i); - ReadCallback<ReadResponse, Row> handler = repairResponseHandlers.get(i); - - Row row; - try - { - row = handler.get(); - } - catch (DigestMismatchException e) - { - throw new AssertionError(e); // full data requested from each node here, no digests should be sent - } - catch (ReadTimeoutException e) - { - if (Tracing.isTracing()) - Tracing.trace("Timed out waiting on digest mismatch repair requests"); - else - logger.debug("Timed out waiting on digest mismatch repair requests"); - // the caught exception here will have CL.ALL from the repair command, - // not whatever CL the initial command was at (CASSANDRA-7947) - int blockFor = consistencyLevel.blockFor(Keyspace.open(command.getKeyspace())); - throw new ReadTimeoutException(consistencyLevel, blockFor-1, blockFor, true); - } - - RowDataResolver resolver = (RowDataResolver)handler.resolver; - try - { - // wait for the repair writes to be acknowledged, to minimize impact on any replica that's - // behind on writes in case the out-of-sync row is read multiple times in quick succession - FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout()); - } - catch (TimeoutException e) - { - if (Tracing.isTracing()) - Tracing.trace("Timed out waiting on digest mismatch repair acknowledgements"); - else - logger.debug("Timed out waiting on digest mismatch repair acknowledgements"); - int blockFor = consistencyLevel.blockFor(Keyspace.open(command.getKeyspace())); - throw new ReadTimeoutException(consistencyLevel, blockFor-1, blockFor, true); - } - - // retry any potential short reads - ReadCommand retryCommand = command.maybeGenerateRetryCommand(resolver, row); - if (retryCommand != null) - { - Tracing.trace("Issuing retry for read command"); - if (commandsToRetry == Collections.EMPTY_LIST) - commandsToRetry = new ArrayList<>(); - commandsToRetry.add(retryCommand); - continue; - } - - if (row != null) - { - command.maybeTrim(row); - rows.add(row); - } + Tracing.trace("Enqueuing full data read to {}", endpoint); + MessagingService.instance().sendRRWithFailure(message, endpoint, repairHandler); } } - } while (!commandsToRetry.isEmpty()); - - return rows; - } - - static class LocalReadRunnable extends DroppableRunnable - { - private final ReadCommand command; - private final ReadCallback<ReadResponse, Row> handler; - private final long start = System.nanoTime(); - - LocalReadRunnable(ReadCommand command, ReadCallback<ReadResponse, Row> handler) - { - super(MessagingService.Verb.READ); - this.command = command; - this.handler = handler; } - protected void runMayThrow() + void maybeAwaitFullDataRead() throws ReadTimeoutException { + // There wasn't a digest mismatch, we're good + if (repairHandler == null) + return; + + // Otherwise, get the result from the full-data read and check that it's not a short read try { - Keyspace keyspace = Keyspace.open(command.ksName); - Row r = command.getRow(keyspace); - ReadResponse result = ReadVerbHandler.getResponse(command, r); - MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); - handler.response(result); + result = repairHandler.get(); } - catch (Throwable t) + catch (DigestMismatchException e) { - handler.onFailure(FBUtilities.getBroadcastAddress()); - if (t instanceof TombstoneOverwhelmingException) - logger.error(t.getMessage()); + throw new AssertionError(e); // full data requested from each node here, no digests should be sent + } + catch (ReadTimeoutException e) + { + if (Tracing.isTracing()) + Tracing.trace("Timed out waiting on digest mismatch repair requests"); else - throw t; + logger.debug("Timed out waiting on digest mismatch repair requests"); + // the caught exception here will have CL.ALL from the repair command, + // not whatever CL the initial command was at (CASSANDRA-7947) + int blockFor = consistency.blockFor(Keyspace.open(command.metadata().ksName)); + throw new ReadTimeoutException(consistency, blockFor-1, blockFor, true); } } + + PartitionIterator getResult() + { + assert result != null; + return result; + } } - static class LocalRangeSliceRunnable extends DroppableRunnable + static class LocalReadRunnable extends DroppableRunnable { - private final AbstractRangeCommand command; - private final ReadCallback<RangeSliceReply, Iterable<Row>> handler; + private final ReadCommand command; + private final ReadCallback handler; private final long start = System.nanoTime(); - LocalRangeSliceRunnable(AbstractRangeCommand command, ReadCallback<RangeSliceReply, Iterable<Row>> handler) + LocalReadRunnable(ReadCommand command, ReadCallback handler) { - super(MessagingService.Verb.RANGE_SLICE); + super(MessagingService.Verb.READ); this.command = command; this.handler = handler; } @@ -1563,9 +1512,11 @@ public class StorageProxy implements StorageProxyMBean { try { - RangeSliceReply result = new RangeSliceReply(command.executeLocally()); + try (ReadOrderGroup orderGroup = command.startOrderGroup(); UnfilteredPartitionIterator iterator = command.executeLocally(orderGroup)) + { + handler.response(command.createResponse(iterator)); + } MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); - handler.response(result); } catch (Throwable t) { @@ -1583,7 +1534,7 @@ public class StorageProxy implements StorageProxyMBean return getLiveSortedEndpoints(keyspace, StorageService.getPartitioner().decorateKey(key)); } - private static List<InetAddress> getLiveSortedEndpoints(Keyspace keyspace, RingPosition pos) + public static List<InetAddress> getLiveSortedEndpoints(Keyspace keyspace, RingPosition pos) { List<InetAddress> liveEndpoints = StorageService.instance.getLiveNaturalEndpoints(keyspace, pos); DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(), liveEndpoints); @@ -1603,307 +1554,322 @@ public class StorageProxy implements StorageProxyMBean } /** - * Estimate the number of result rows (either cql3 rows or storage rows, as called for by the command) per + * Estimate the number of result rows (either cql3 rows or "thrift" rows, as called for by the command) per * range in the ring based on our local data. This assumes that ranges are uniformly distributed across the cluster * and that the queried data is also uniformly distributed. */ - private static float estimateResultRowsPerRange(AbstractRangeCommand command, Keyspace keyspace) + private static float estimateResultsPerRange(PartitionRangeReadCommand command, Keyspace keyspace) { - ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.columnFamily); - float resultRowsPerRange = Float.POSITIVE_INFINITY; - if (command.rowFilter != null && !command.rowFilter.isEmpty()) + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.metadata().cfId); + SecondaryIndexSearcher searcher = cfs.indexManager.getBestIndexSearcherFor(command); + + float maxExpectedResults = searcher == null + ? command.limits().estimateTotalResults(cfs) + : searcher.highestSelectivityIndex(command.rowFilter()).estimateResultRows(); + + // adjust maxExpectedResults by the number of tokens this node has and the replication factor for this ks + return (maxExpectedResults / DatabaseDescriptor.getNumTokens()) / keyspace.getReplicationStrategy().getReplicationFactor(); + } + + private static class RangeForQuery + { + public final AbstractBounds<PartitionPosition> range; + public final List<InetAddress> liveEndpoints; + public final List<InetAddress> filteredEndpoints; + + public RangeForQuery(AbstractBounds<PartitionPosition> range, List<InetAddress> liveEndpoints, List<InetAddress> filteredEndpoints) { - List<SecondaryIndexSearcher> searchers = cfs.indexManager.getIndexSearchersForQuery(command.rowFilter); - if (searchers.isEmpty()) - { - resultRowsPerRange = calculateResultRowsUsingEstimatedKeys(cfs); - } - else - { - // Secondary index query (cql3 or otherwise). Estimate result rows based on most selective 2ary index. - for (SecondaryIndexSearcher searcher : searchers) - { - // use our own mean column count as our estimate for how many matching rows each node will have - SecondaryIndex highestSelectivityIndex = searcher.highestSelectivityIndex(command.rowFilter); - resultRowsPerRange = Math.min(resultRowsPerRange, highestSelectivityIndex.estimateResultRows()); - } - } + this.range = range; + this.liveEndpoints = liveEndpoints; + this.filteredEndpoints = filteredEndpoints; } - else if (!command.countCQL3Rows()) + } + + private static class RangeIterator extends AbstractIterator<RangeForQuery> + { + private final Keyspace keyspace; + private final ConsistencyLevel consistency; + private final Iterator<? extends AbstractBounds<PartitionPosition>> ranges; + private final int rangeCount; + + public RangeIterator(PartitionRangeReadCommand command, Keyspace keyspace, ConsistencyLevel consistency) { - // non-cql3 query - resultRowsPerRange = cfs.estimateKeys(); + this.keyspace = keyspace; + this.consistency = consistency; + + List<? extends AbstractBounds<PartitionPosition>> l = keyspace.getReplicationStrategy() instanceof LocalStrategy + ? command.dataRange().keyRange().unwrap() + : getRestrictedRanges(command.dataRange().keyRange()); + this.ranges = l.iterator(); + this.rangeCount = l.size(); } - else + + public int rangeCount() { - resultRowsPerRange = calculateResultRowsUsingEstimatedKeys(cfs); + return rangeCount; } - // adjust resultRowsPerRange by the number of tokens this node has and the replication factor for this ks - return (resultRowsPerRange / DatabaseDescriptor.getNumTokens()) / keyspace.getReplicationStrategy().getReplicationFactor(); + protected RangeForQuery computeNext() + { + if (!ranges.hasNext()) + return endOfData(); + + AbstractBounds<PartitionPosition> range = ranges.next(); + List<InetAddress> liveEndpoints = getLiveSortedEndpoints(keyspace, range.right); + return new RangeForQuery(range, + liveEndpoints, + consistency.filterForQuery(keyspace, liveEndpoints)); + } } - private static float calculateResultRowsUsingEstimatedKeys(ColumnFamilyStore cfs) + private static class RangeMerger extends AbstractIterator<RangeForQuery> { - if (cfs.metadata.comparator.isDense()) + private final Keyspace keyspace; + private final ConsistencyLevel consistency; + private final PeekingIterator<RangeForQuery> ranges; + + private RangeMerger(Iterator<RangeForQuery> iterator, Keyspace keyspace, ConsistencyLevel consistency) { - // one storage row per result row, so use key estimate directly - return cfs.estimateKeys(); + this.keyspace = keyspace; + this.consistency = consistency; + this.ranges = Iterators.peekingIterator(iterator); } - else + + protected RangeForQuery computeNext() { - float resultRowsPerStorageRow = ((float) cfs.getMeanColumns()) / cfs.metadata.regularColumns().size(); - return resultRowsPerStorageRow * (cfs.estimateKeys()); + if (!ranges.hasNext()) + return endOfData(); + + RangeForQuery current = ranges.next(); + + // getRestrictedRange has broken the queried range into per-[vnode] token ranges, but this doesn't take + // the replication factor into account. If the intersection of live endpoints for 2 consecutive ranges + // still meets the CL requirements, then we can merge both ranges into the same RangeSliceCommand. + while (ranges.hasNext()) + { + // If the current range right is the min token, we should stop merging because CFS.getRangeSlice + // don't know how to deal with a wrapping range. + // Note: it would be slightly more efficient to have CFS.getRangeSlice on the destination nodes unwraps + // the range if necessary and deal with it. However, we can't start sending wrapped range without breaking + // wire compatibility, so It's likely easier not to bother; + if (current.range.right.isMinimum()) + break; + + RangeForQuery next = ranges.peek(); + + List<InetAddress> merged = intersection(current.liveEndpoints, next.liveEndpoints); + + // Check if there is enough endpoint for the merge to be possible. + if (!consistency.isSufficientLiveNodes(keyspace, merged)) + break; + + List<InetAddress> filteredMerged = consistency.filterForQuery(keyspace, merged); + + // Estimate whether merging will be a win or not + if (!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(filteredMerged, current.filteredEndpoints, next.filteredEndpoints)) + break; + + // If we get there, merge this range and the next one + current = new RangeForQuery(current.range.withNewRight(next.range.right), merged, filteredMerged); + ranges.next(); // consume the range we just merged since we've only peeked so far + } + return current; } } - public static List<Row> getRangeSlice(AbstractRangeCommand command, ConsistencyLevel consistency_level) - throws UnavailableException, ReadFailureException, ReadTimeoutException + private static class SingleRangeResponse extends AbstractIterator<RowIterator> implements PartitionIterator { - Tracing.trace("Computing ranges to query"); - long startTime = System.nanoTime(); + private final ReadCallback handler; + private PartitionIterator result; - Keyspace keyspace = Keyspace.open(command.keyspace); - List<Row> rows; - // now scan until we have enough results - try + private SingleRangeResponse(ReadCallback handler) { - int liveRowCount = 0; - boolean countLiveRows = command.countCQL3Rows() || command.ignoredTombstonedPartitions(); - rows = new ArrayList<>(); + this.handler = handler; + } - // when dealing with LocalStrategy keyspaces, we can skip the range splitting and merging (which can be - // expensive in clusters with vnodes) - List<? extends AbstractBounds<RowPosition>> ranges; - if (keyspace.getReplicationStrategy() instanceof LocalStrategy) - ranges = command.keyRange.unwrap(); - else - ranges = getRestrictedRanges(command.keyRange); + private void waitForResponse() throws ReadTimeoutException + { + if (result != null) + return; - // determine the number of rows to be fetched and the concurrency factor - int rowsToBeFetched = command.limit(); - int concurrencyFactor; - if (command.requiresScanningAllRanges()) + try { - // all nodes must be queried - rowsToBeFetched *= ranges.size(); - concurrencyFactor = ranges.size(); - logger.debug("Requested rows: {}, ranges.size(): {}; concurrent range requests: {}", - command.limit(), - ranges.size(), - concurrencyFactor); - Tracing.trace("Submitting range requests on {} ranges with a concurrency of {}", - ranges.size(), concurrencyFactor); + result = handler.get(); } - else + catch (DigestMismatchException e) { - // our estimate of how many result rows there will be per-range - float resultRowsPerRange = estimateResultRowsPerRange(command, keyspace); - // underestimate how many rows we will get per-range in order to increase the likelihood that we'll - // fetch enough rows in the first round - resultRowsPerRange -= resultRowsPerRange * CONCURRENT_SUBREQUESTS_MARGIN; - concurrencyFactor = resultRowsPerRange == 0.0 - ? 1 - : Math.max(1, Math.min(ranges.size(), (int) Math.ceil(command.limit() / resultRowsPerRange))); - - logger.debug("Estimated result rows per range: {}; requested rows: {}, ranges.size(): {}; concurrent range requests: {}", - resultRowsPerRange, - command.limit(), - ranges.size(), - concurrencyFactor); - Tracing.trace("Submitting range requests on {} ranges with a concurrency of {} ({} rows per range expected)", - ranges.size(), - concurrencyFactor, - resultRowsPerRange); + throw new AssertionError(e); // no digests in range slices yet } + } + + protected RowIterator computeNext() + { + waitForResponse(); + return result.hasNext() ? result.next() : endOfData(); + } + + public void close() + { + if (result != null) + result.close(); + } + } + + private static class RangeCommandIterator extends AbstractIterator<RowIterator> implements PartitionIterator + { + private final Iterator<RangeForQuery> ranges; + private final int totalRangeCount; + private final PartitionRangeReadCommand command; + private final Keyspace keyspace; + private final ConsistencyLevel consistency; - boolean haveSufficientRows = false; - int i = 0; - AbstractBounds<RowPosition> nextRange = null; - List<InetAddress> nextEndpoints = null; - List<InetAddress> nextFilteredEndpoints = null; - while (i < ranges.size()) + private final long startTime; + private CountingPartitionIterator sentQueryIterator; + + private int concurrencyFactor; + // The two following "metric" are maintained to improve the concurrencyFactor + // when it was not good enough initially. + private int liveReturned; + private int rangesQueried; + + public RangeCommandIterator(RangeIterator ranges, PartitionRangeReadCommand command, int concurrencyFactor, Keyspace keyspace, ConsistencyLevel consistency) + { + this.command = command; + this.concurrencyFactor = concurrencyFactor; + this.startTime = System.nanoTime(); + this.ranges = new RangeMerger(ranges, keyspace, consistency); + this.totalRangeCount = ranges.rangeCount(); + this.consistency = consistency; + this.keyspace = keyspace; + } + + public RowIterator computeNext() + { + while (sentQueryIterator == null || !sentQueryIterator.hasNext()) { - List<Pair<AbstractRangeCommand, ReadCallback<RangeSliceReply, Iterable<Row>>>> scanHandlers = new ArrayList<>(concurrencyFactor); - int concurrentFetchStartingIndex = i; - int concurrentRequests = 0; - while ((i - concurrentFetchStartingIndex) < concurrencyFactor) + // If we don't have more range to handle, we're done + if (!ranges.hasNext()) + return endOfData(); + + // else, sends the next batch of concurrent queries (after having close the previous iterator) + if (sentQueryIterator != null) { - AbstractBounds<RowPosition> range = nextRange == null - ? ranges.get(i) - : nextRange; - List<InetAddress> liveEndpoints = nextEndpoints == null - ? getLiveSortedEndpoints(keyspace, range.right) - : nextEndpoints; - List<InetAddress> filteredEndpoints = nextFilteredEndpoints == null - ? consistency_level.filterForQuery(keyspace, liveEndpoints) - : nextFilteredEndpoints; - ++i; - ++concurrentRequests; - - // getRestrictedRange has broken the queried range into per-[vnode] token ranges, but this doesn't take - // the replication factor into account. If the intersection of live endpoints for 2 consecutive ranges - // still meets the CL requirements, then we can merge both ranges into the same RangeSliceCommand. - while (i < ranges.size()) - { - nextRange = ranges.get(i); - nextEndpoints = getLiveSortedEndpoints(keyspace, nextRange.right); - nextFilteredEndpoints = consistency_level.filterForQuery(keyspace, nextEndpoints); - - // If the current range right is the min token, we should stop merging because CFS.getRangeSlice - // don't know how to deal with a wrapping range. - // Note: it would be slightly more efficient to have CFS.getRangeSlice on the destination nodes unwraps - // the range if necessary and deal with it. However, we can't start sending wrapped range without breaking - // wire compatibility, so It's likely easier not to bother; - if (range.right.isMinimum()) - break; - - List<InetAddress> merged = intersection(liveEndpoints, nextEndpoints); - - // Check if there is enough endpoint for the merge to be possible. - if (!consistency_level.isSufficientLiveNodes(keyspace, merged)) - break; - - List<InetAddress> filteredMerged = consistency_level.filterForQuery(keyspace, merged); - - // Estimate whether merging will be a win or not - if (!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(filteredMerged, filteredEndpoints, nextFilteredEndpoints)) - break; - - // If we get there, merge this range and the next one - range = range.withNewRight(nextRange.right); - liveEndpoints = merged; - filteredEndpoints = filteredMerged; - ++i; - } + liveReturned += sentQueryIterator.counter().counted(); + sentQueryIterator.close(); - AbstractRangeCommand nodeCmd = command.forSubRange(range); - - // collect replies and resolve according to consistency level - RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(nodeCmd.keyspace, command.timestamp); - List<InetAddress> minimalEndpoints = filteredEndpoints.subList(0, Math.min(filteredEndpoints.size(), consistency_level.blockFor(keyspace))); - ReadCallback<RangeSliceReply, Iterable<Row>> handler = new ReadCallback<>(resolver, consistency_level, nodeCmd, minimalEndpoints); - handler.assureSufficientLiveNodes(); - resolver.setSources(filteredEndpoints); - if (filteredEndpoints.size() == 1 - && filteredEndpoints.get(0).equals(FBUtilities.getBroadcastAddress()) - && OPTIMIZE_LOCAL_REQUESTS) - { - StageManager.getStage(Stage.READ).execute(new LocalRangeSliceRunnable(nodeCmd, handler), Tracing.instance.get()); - } - else - { - MessageOut<? extends AbstractRangeCommand> message = nodeCmd.createMessage(); - for (InetAddress endpoint : filteredEndpoints) - { - Tracing.trace("Enqueuing request to {}", endpoint); - MessagingService.instance().sendRRWithFailure(message, endpoint, handler); - } - } - scanHandlers.add(Pair.create(nodeCmd, handler)); + // It's not the first batch of queries and we're not done, so we we can use what has been + // returned so far to improve our rows-per-range estimate and update the concurrency accordingly + updateConcurrencyFactor(); } - Tracing.trace("Submitted {} concurrent range requests covering {} ranges", concurrentRequests, i - concurrentFetchStartingIndex); + sentQueryIterator = sendNextRequests(); + } - List<AsyncOneResponse> repairResponses = new ArrayList<>(); - for (Pair<AbstractRangeCommand, ReadCallback<RangeSliceReply, Iterable<Row>>> cmdPairHandler : scanHandlers) - { - ReadCallback<RangeSliceReply, Iterable<Row>> handler = cmdPairHandler.right; - RangeSliceResponseResolver resolver = (RangeSliceResponseResolver)handler.resolver; + return sentQueryIterator.next(); + } - try - { - for (Row row : handler.get()) - { - rows.add(row); - if (countLiveRows) - liveRowCount += row.getLiveCount(command.predicate, command.timestamp); - } - repairResponses.addAll(resolver.repairResults); - } - catch (ReadTimeoutException|ReadFailureException ex) - { - // we timed out or failed waiting for responses - int blockFor = consistency_level.blockFor(keyspace); - int responseCount = resolver.responses.size(); - String gotData = responseCount > 0 - ? resolver.isDataPresent() ? " (including data)" : " (only digests)" - : ""; - - boolean isTimeout = ex instanceof ReadTimeoutException; - if (Tracing.isTracing()) - { - Tracing.trace("{}; received {} of {} responses{} for range {} of {}", - (isTimeout ? "Timed out" : "Failed"), responseCount, blockFor, gotData, i, ranges.size()); - } - else if (logger.isDebugEnabled()) - { - logger.debug("Range slice {}; received {} of {} responses{} for range {} of {}", - (isTimeout ? "timeout" : "failure"), responseCount, blockFor, gotData, i, ranges.size()); - } - throw ex; - } - catch (DigestMismatchException e) - { - throw new AssertionError(e); // no digests in range slices yet - } + private void updateConcurrencyFactor() + { + if (liveReturned == 0) + { + // we haven't actually gotten any results, so query all remaining ranges at once + concurrencyFactor = totalRangeCount - rangesQueried; + return; + } - // if we're done, great, otherwise, move to the next range - int count = countLiveRows ? liveRowCount : rows.size(); - if (count >= rowsToBeFetched) - { - haveSufficientRows = true; - break; - } - } + // Otherwise, compute how many rows per range we got on average and pick a concurrency factor + // that should allow us to fetch all remaining rows with the next batch of (concurrent) queries. + int remainingRows = command.limits().count() - liveReturned; + float rowsPerRange = (float)liveReturned / (float)rangesQueried; + concurrencyFactor = Math.max(1, Math.min(totalRangeCount - rangesQueried, Math.round(remainingRows / rowsPerRange))); + logger.debug("Didn't get enough response rows; actual rows per range: {}; remaining rows: {}, new concurrent requests: {}", + rowsPerRange, (int) remainingRows, concurrencyFactor); + } - try - { - FBUtilities.waitOnFutures(repairResponses, DatabaseDescriptor.getWriteRpcTimeout()); - } - catch (TimeoutException ex) - { - // We got all responses, but timed out while repairing - int blockFor = consistency_level.blockFor(keyspace); - if (Tracing.isTracing()) - Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor); - else - logger.debug("Range slice timeout while read-repairing after receiving all {} data and digest responses", blockFor); - throw new ReadTimeoutException(consistency_level, blockFor-1, blockFor, true); - } + private SingleRangeResponse query(RangeForQuery toQuery) + { + PartitionRangeReadCommand rangeCommand = command.forSubRange(toQuery.range); + + DataResolver resolver = new DataResolver(keyspace, rangeCommand, consistency, toQuery.filteredEndpoints.size()); - if (haveSufficientRows) - return command.postReconciliationProcessing(rows); + int blockFor = consistency.blockFor(keyspace); + int minResponses = Math.min(toQuery.filteredEndpoints.size(), blockFor); + List<InetAddress> minimalEndpoints = toQuery.filteredEndpoints.subList(0, minResponses); + ReadCallback handler = new ReadCallback(resolver, consistency, rangeCommand, minimalEndpoints); - // we didn't get enough rows in our concurrent fetch; recalculate our concurrency factor - // based on the results we've seen so far (as long as we still have ranges left to query) - if (i < ranges.size()) + handler.assureSufficientLiveNodes(); + + if (toQuery.filteredEndpoints.size() == 1 && canDoLocalRequest(toQuery.filteredEndpoints.get(0))) + { + StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(rangeCommand, handler), Tracing.instance.get()); + } + else + { + MessageOut<ReadCommand> message = rangeCommand.createMessage(); + for (InetAddress endpoint : toQuery.filteredEndpoints) { - float fetchedRows = countLiveRows ? liveRowCount : rows.size(); - float remainingRows = rowsToBeFetched - fetchedRows; - float actualRowsPerRange; - if (fetchedRows == 0.0) - { - // we haven't actually gotten any results, so query all remaining ranges at once - actualRowsPerRange = 0.0f; - concurrencyFactor = ranges.size() - i; - } - else - { - actualRowsPerRange = fetchedRows / i; - concurrencyFactor = Math.max(1, Math.min(ranges.size() - i, Math.round(remainingRows / actualRowsPerRange))); - } - logger.debug("Didn't get enough response rows; actual rows per range: {}; remaining rows: {}, new concurrent requests: {}", - actualRowsPerRange, (int) remainingRows, concurrencyFactor); + Tracing.trace("Enqueuing request to {}", endpoint); + MessagingService.instance().sendRRWithFailure(message, endpoint, handler); } } + + return new SingleRangeResponse(handler); } - finally + + private CountingPartitionIterator sendNextRequests() { - long latency = System.nanoTime() - startTime; - rangeMetrics.addNano(latency); - Keyspace.open(command.keyspace).getColumnFamilyStore(command.columnFamily).metric.coordinatorScanLatency.update(latency, TimeUnit.NANOSECONDS); + List<PartitionIterator> concurrentQueries = new ArrayList<>(concurrencyFactor); + for (int i = 0; i < concurrencyFactor && ranges.hasNext(); i++) + { + concurrentQueries.add(query(ranges.next())); + ++rangesQueried; + } + + Tracing.trace("Submitted {} concurrent range requests", concurrentQueries.size()); + return new CountingPartitionIterator(PartitionIterators.concat(concurrentQueries), command.limits(), command.nowInSec()); } - return command.postReconciliationProcessing(rows); + + public void close() + { + try + { + if (sentQueryIterator != null) + sentQueryIterator.close(); + } + finally + { + long latency = System.nanoTime() - startTime; + rangeMetrics.addNano(latency); + Keyspace.openAndGetStore(command.metadata()).metric.coordinatorScanLatency.update(latency, TimeUnit.NANOSECONDS); + } + } + } + + @SuppressWarnings("resource") + public static PartitionIterator getRangeSlice(PartitionRangeReadCommand command, ConsistencyLevel consistencyLevel) + throws UnavailableException, ReadFailureException, ReadTimeoutException + { + Tracing.trace("Computing ranges to query"); + long startTime = System.nanoTime(); + + List<FilteredPartition> partitions = new ArrayList<>(); + + Keyspace keyspace = Keyspace.open(command.metadata().ksName); + RangeIterator ranges = new RangeIterator(command, keyspace, consistencyLevel); + + // our estimate of how many result rows there will be per-range + float resultsPerRange = estimateResultsPerRange(command, keyspace); + // underestimate how many rows we will get per-range in order to increase the likelihood that we'll + // fetch enough rows in the first round + resultsPerRange -= resultsPerRange * CONCURRENT_SUBREQUESTS_MARGIN; + int concurrencyFactor = resultsPerRange == 0.0 + ? 1 + : Math.max(1, Math.min(ranges.rangeCount(), (int) Math.ceil(command.limits().count() / resultsPerRange))); + logger.debug("Estimated result rows per range: {}; requested rows: {}, ranges.size(): {}; concurrent range requests: {}", + resultsPerRange, command.limits().count(), ranges.rangeCount(), concurrencyFactor); + Tracing.trace("Submitting range requests on {} ranges with a concurrency of {} ({} rows per range expected)", ranges.rangeCount(), concurrencyFactor, resultsPerRange); + + // Note that in general, a RangeCommandIterator will honor the command limit for each range, but will not enforce it globally. + return command.postReconciliationProcessing(command.limits().filter(new RangeCommandIterator(ranges, command, concurrencyFactor, keyspace, consistencyLevel), command.nowInSec())); } public Map<String, List<String>> getSchemaVersions()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 25ed849..93421ce 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -67,21 +67,7 @@ import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.*; -import org.apache.cassandra.db.BatchlogManager; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.CounterMutationVerbHandler; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.DefinitionsUpdateVerbHandler; -import org.apache.cassandra.db.HintedHandOffManager; -import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.db.MigrationRequestVerbHandler; -import org.apache.cassandra.db.MutationVerbHandler; -import org.apache.cassandra.db.ReadRepairVerbHandler; -import org.apache.cassandra.db.ReadVerbHandler; -import org.apache.cassandra.db.SchemaCheckVerbHandler; -import org.apache.cassandra.db.SnapshotDetailsTabularData; -import org.apache.cassandra.db.SystemKeyspace; -import org.apache.cassandra.db.TruncateVerbHandler; +import org.apache.cassandra.db.*; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.dht.BootStrapper; @@ -311,9 +297,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE /* register the verb handlers */ MessagingService.instance().registerVerbHandlers(MessagingService.Verb.MUTATION, new MutationVerbHandler()); MessagingService.instance().registerVerbHandlers(MessagingService.Verb.READ_REPAIR, new ReadRepairVerbHandler()); - MessagingService.instance().registerVerbHandlers(MessagingService.Verb.READ, new ReadVerbHandler()); - MessagingService.instance().registerVerbHandlers(MessagingService.Verb.RANGE_SLICE, new RangeSliceVerbHandler()); - MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PAGED_RANGE, new RangeSliceVerbHandler()); + MessagingService.instance().registerVerbHandlers(MessagingService.Verb.READ, new ReadCommandVerbHandler()); + MessagingService.instance().registerVerbHandlers(MessagingService.Verb.RANGE_SLICE, new ReadCommandVerbHandler()); + MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PAGED_RANGE, new ReadCommandVerbHandler()); MessagingService.instance().registerVerbHandlers(MessagingService.Verb.COUNTER_MUTATION, new CounterMutationVerbHandler()); MessagingService.instance().registerVerbHandlers(MessagingService.Verb.TRUNCATE, new TruncateVerbHandler()); MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PAXOS_PREPARE, new PrepareVerbHandler()); @@ -3901,7 +3887,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } // Never ever do this at home. Used by tests. - IPartitioner setPartitionerUnsafe(IPartitioner newPartitioner) + @VisibleForTesting + public IPartitioner setPartitionerUnsafe(IPartitioner newPartitioner) { IPartitioner oldPartitioner = DatabaseDescriptor.getPartitioner(); DatabaseDescriptor.setPartitioner(newPartitioner);
