Repository: cassandra
Updated Branches:
  refs/heads/trunk b12413d4e -> f8b3a1588


Don't warn on big batches if everything is in the same partition

patch by slebresne; reviewed by iamaleksey for CASSANDRA-10876


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f8b3a158
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f8b3a158
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f8b3a158

Branch: refs/heads/trunk
Commit: f8b3a15881c411ff766425084776e2339fe6a17b
Parents: b12413d
Author: Sylvain Lebresne <sylv...@datastax.com>
Authored: Thu Feb 25 14:20:29 2016 +0100
Committer: Sylvain Lebresne <sylv...@datastax.com>
Committed: Tue Mar 15 10:21:29 2016 +0100

----------------------------------------------------------------------
 .../cql3/statements/BatchStatement.java         | 62 +++++++++++---------
 .../cql3/statements/CQL3CasRequest.java         |  4 --
 2 files changed, 33 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8b3a158/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 9faf73c..058969b 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -262,22 +262,32 @@ public class BatchStatement implements CQLStatement
      *
      * @param updates - the batch mutations.
      */
-    public static void verifyBatchSize(Iterable<PartitionUpdate> updates) 
throws InvalidRequestException
+    private static void verifyBatchSize(Collection<? extends IMutation> 
mutations) throws InvalidRequestException
     {
+        // We only warn for batch spanning multiple mutations (#10876)
+        if (mutations.size() <= 1)
+            return;
+
         long size = 0;
         long warnThreshold = DatabaseDescriptor.getBatchSizeWarnThreshold();
         long failThreshold = DatabaseDescriptor.getBatchSizeFailThreshold();
 
-        for (PartitionUpdate update : updates)
-            size += update.dataSize();
+        for (IMutation mutation : mutations)
+        {
+            for (PartitionUpdate update : mutation.getPartitionUpdates())
+                size += update.dataSize();
+        }
 
         if (size > warnThreshold)
         {
             Set<String> tableNames = new HashSet<>();
-            for (PartitionUpdate update : updates)
-                tableNames.add(String.format("%s.%s", 
update.metadata().ksName, update.metadata().cfName));
+            for (IMutation mutation : mutations)
+            {
+                for (PartitionUpdate update : mutation.getPartitionUpdates())
+                    tableNames.add(String.format("%s.%s", 
update.metadata().ksName, update.metadata().cfName));
+            }
 
-            String format = "Batch of prepared statements for {} is of size 
{}, exceeding specified threshold of {} by {}.{}";
+            String format = "Batch for {} is of size {}, exceeding specified 
threshold of {} by {}.{}";
             if (size > failThreshold)
             {
                 Tracing.trace(format, tableNames, size, failThreshold, size - 
failThreshold, " (see batch_size_fail_threshold_in_kb)");
@@ -292,29 +302,31 @@ public class BatchStatement implements CQLStatement
         }
     }
 
-    private void verifyBatchType(Iterable<PartitionUpdate> updates)
+    private void verifyBatchType(Collection<? extends IMutation> mutations)
     {
-        if (!isLogged() && Iterables.size(updates) > 1)
+        if (!isLogged() && mutations.size() > 1)
         {
             Set<DecoratedKey> keySet = new HashSet<>();
             Set<String> tableNames = new HashSet<>();
 
             Map<String, Collection<Range<Token>>> localTokensByKs = new 
HashMap<>();
             boolean localPartitionsOnly = true;
-            for (PartitionUpdate update : updates)
+            for (IMutation mutation : mutations)
             {
-                keySet.add(update.partitionKey());
-                tableNames.add(String.format("%s.%s", 
update.metadata().ksName, update.metadata().cfName));
+                for (PartitionUpdate update : mutation.getPartitionUpdates())
+                {
+                    keySet.add(update.partitionKey());
+                    tableNames.add(String.format("%s.%s", 
update.metadata().ksName, update.metadata().cfName));
+                }
 
                 if (localPartitionsOnly)
-                    localPartitionsOnly &= isPartitionLocal(localTokensByKs, 
update);
+                    localPartitionsOnly &= isPartitionLocal(localTokensByKs, 
mutation);
             }
 
             // CASSANDRA-9303: If we only have local mutations we do not warn
             if (localPartitionsOnly)
                 return;
 
-
             NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, 
TimeUnit.MINUTES, UNLOGGED_BATCH_WARNING,
                              keySet.size(), keySet.size() == 1 ? "" : "s",
                              tableNames.size() == 1 ? "" : "s", tableNames);
@@ -324,16 +336,17 @@ public class BatchStatement implements CQLStatement
         }
     }
 
-    private boolean isPartitionLocal(Map<String, Collection<Range<Token>>> 
localTokensByKs, PartitionUpdate update)
+    private boolean isPartitionLocal(Map<String, Collection<Range<Token>>> 
localTokensByKs, IMutation mutation)
     {
-        Collection<Range<Token>> localRanges = 
localTokensByKs.get(update.metadata().ksName);
+        String ksName = mutation.getKeyspaceName();
+        Collection<Range<Token>> localRanges = localTokensByKs.get(ksName);
         if (localRanges == null)
         {
-            localRanges = 
StorageService.instance.getLocalRanges(update.metadata().ksName);
-            localTokensByKs.put(update.metadata().ksName, localRanges);
+            localRanges = StorageService.instance.getLocalRanges(ksName);
+            localTokensByKs.put(ksName, localRanges);
         }
 
-        return Range.isInRanges(update.partitionKey().getToken(), localRanges);
+        return Range.isInRanges(mutation.key().getToken(), localRanges);
     }
 
     public ResultMessage execute(QueryState queryState, QueryOptions options) 
throws RequestExecutionException, RequestValidationException
@@ -366,17 +379,8 @@ public class BatchStatement implements CQLStatement
         if (mutations.isEmpty())
             return;
 
-        // Extract each collection of updates from it's IMutation and then 
lazily concatenate all of them into a single Iterable.
-        Iterable<PartitionUpdate> updates = 
Iterables.concat(Iterables.transform(mutations, new Function<IMutation, 
Collection<PartitionUpdate>>()
-        {
-            public Collection<PartitionUpdate> apply(IMutation im)
-            {
-                return im.getPartitionUpdates();
-            }
-        }));
-
-        verifyBatchSize(updates);
-        verifyBatchType(updates);
+        verifyBatchSize(mutations);
+        verifyBatchType(mutations);
 
         boolean mutateAtomic = (isLogged() && mutations.size() > 1);
         StorageProxy.mutateWithTriggers(mutations, cl, mutateAtomic);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8b3a158/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java 
b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
index 9564005..93844b3 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
@@ -175,10 +175,6 @@ public class CQL3CasRequest implements CASRequest
             upd.applyUpdates(current, update);
 
         Keyspace.openAndGetStore(cfm).indexManager.validate(update);
-
-        if (isBatch)
-            BatchStatement.verifyBatchSize(Collections.singleton(update));
-
         return update;
     }
 

Reply via email to