Repository: cassandra Updated Branches: refs/heads/trunk b12413d4e -> f8b3a1588
Don't warn on big batches if everything is in the same partition patch by slebresne; reviewed by iamaleksey for CASSANDRA-10876 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f8b3a158 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f8b3a158 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f8b3a158 Branch: refs/heads/trunk Commit: f8b3a15881c411ff766425084776e2339fe6a17b Parents: b12413d Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Thu Feb 25 14:20:29 2016 +0100 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Tue Mar 15 10:21:29 2016 +0100 ---------------------------------------------------------------------- .../cql3/statements/BatchStatement.java | 62 +++++++++++--------- .../cql3/statements/CQL3CasRequest.java | 4 -- 2 files changed, 33 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8b3a158/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java index 9faf73c..058969b 100644 --- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java @@ -262,22 +262,32 @@ public class BatchStatement implements CQLStatement * * @param updates - the batch mutations. */ - public static void verifyBatchSize(Iterable<PartitionUpdate> updates) throws InvalidRequestException + private static void verifyBatchSize(Collection<? extends IMutation> mutations) throws InvalidRequestException { + // We only warn for batch spanning multiple mutations (#10876) + if (mutations.size() <= 1) + return; + long size = 0; long warnThreshold = DatabaseDescriptor.getBatchSizeWarnThreshold(); long failThreshold = DatabaseDescriptor.getBatchSizeFailThreshold(); - for (PartitionUpdate update : updates) - size += update.dataSize(); + for (IMutation mutation : mutations) + { + for (PartitionUpdate update : mutation.getPartitionUpdates()) + size += update.dataSize(); + } if (size > warnThreshold) { Set<String> tableNames = new HashSet<>(); - for (PartitionUpdate update : updates) - tableNames.add(String.format("%s.%s", update.metadata().ksName, update.metadata().cfName)); + for (IMutation mutation : mutations) + { + for (PartitionUpdate update : mutation.getPartitionUpdates()) + tableNames.add(String.format("%s.%s", update.metadata().ksName, update.metadata().cfName)); + } - String format = "Batch of prepared statements for {} is of size {}, exceeding specified threshold of {} by {}.{}"; + String format = "Batch for {} is of size {}, exceeding specified threshold of {} by {}.{}"; if (size > failThreshold) { Tracing.trace(format, tableNames, size, failThreshold, size - failThreshold, " (see batch_size_fail_threshold_in_kb)"); @@ -292,29 +302,31 @@ public class BatchStatement implements CQLStatement } } - private void verifyBatchType(Iterable<PartitionUpdate> updates) + private void verifyBatchType(Collection<? extends IMutation> mutations) { - if (!isLogged() && Iterables.size(updates) > 1) + if (!isLogged() && mutations.size() > 1) { Set<DecoratedKey> keySet = new HashSet<>(); Set<String> tableNames = new HashSet<>(); Map<String, Collection<Range<Token>>> localTokensByKs = new HashMap<>(); boolean localPartitionsOnly = true; - for (PartitionUpdate update : updates) + for (IMutation mutation : mutations) { - keySet.add(update.partitionKey()); - tableNames.add(String.format("%s.%s", update.metadata().ksName, update.metadata().cfName)); + for (PartitionUpdate update : mutation.getPartitionUpdates()) + { + keySet.add(update.partitionKey()); + tableNames.add(String.format("%s.%s", update.metadata().ksName, update.metadata().cfName)); + } if (localPartitionsOnly) - localPartitionsOnly &= isPartitionLocal(localTokensByKs, update); + localPartitionsOnly &= isPartitionLocal(localTokensByKs, mutation); } // CASSANDRA-9303: If we only have local mutations we do not warn if (localPartitionsOnly) return; - NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.MINUTES, UNLOGGED_BATCH_WARNING, keySet.size(), keySet.size() == 1 ? "" : "s", tableNames.size() == 1 ? "" : "s", tableNames); @@ -324,16 +336,17 @@ public class BatchStatement implements CQLStatement } } - private boolean isPartitionLocal(Map<String, Collection<Range<Token>>> localTokensByKs, PartitionUpdate update) + private boolean isPartitionLocal(Map<String, Collection<Range<Token>>> localTokensByKs, IMutation mutation) { - Collection<Range<Token>> localRanges = localTokensByKs.get(update.metadata().ksName); + String ksName = mutation.getKeyspaceName(); + Collection<Range<Token>> localRanges = localTokensByKs.get(ksName); if (localRanges == null) { - localRanges = StorageService.instance.getLocalRanges(update.metadata().ksName); - localTokensByKs.put(update.metadata().ksName, localRanges); + localRanges = StorageService.instance.getLocalRanges(ksName); + localTokensByKs.put(ksName, localRanges); } - return Range.isInRanges(update.partitionKey().getToken(), localRanges); + return Range.isInRanges(mutation.key().getToken(), localRanges); } public ResultMessage execute(QueryState queryState, QueryOptions options) throws RequestExecutionException, RequestValidationException @@ -366,17 +379,8 @@ public class BatchStatement implements CQLStatement if (mutations.isEmpty()) return; - // Extract each collection of updates from it's IMutation and then lazily concatenate all of them into a single Iterable. - Iterable<PartitionUpdate> updates = Iterables.concat(Iterables.transform(mutations, new Function<IMutation, Collection<PartitionUpdate>>() - { - public Collection<PartitionUpdate> apply(IMutation im) - { - return im.getPartitionUpdates(); - } - })); - - verifyBatchSize(updates); - verifyBatchType(updates); + verifyBatchSize(mutations); + verifyBatchType(mutations); boolean mutateAtomic = (isLogged() && mutations.size() > 1); StorageProxy.mutateWithTriggers(mutations, cl, mutateAtomic); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8b3a158/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java index 9564005..93844b3 100644 --- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java +++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java @@ -175,10 +175,6 @@ public class CQL3CasRequest implements CASRequest upd.applyUpdates(current, update); Keyspace.openAndGetStore(cfm).indexManager.validate(update); - - if (isBatch) - BatchStatement.verifyBatchSize(Collections.singleton(update)); - return update; }