Repository: cassandra Updated Branches: refs/heads/trunk 1644e82f7 -> bf208377a
Warn on unlogged batch misuse Patch by tjake; reviewed by jbellis for CASSANDRA-9282 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ebd05ddb Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ebd05ddb Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ebd05ddb Branch: refs/heads/trunk Commit: ebd05ddbe1fca8c70e9790628c0cce47327e4708 Parents: f5f5912 Author: T Jake Luciani <[email protected]> Authored: Mon May 4 14:12:53 2015 -0400 Committer: T Jake Luciani <[email protected]> Committed: Fri May 15 08:50:21 2015 -0400 ---------------------------------------------------------------------- .../cql3/statements/BatchStatement.java | 40 +++++++++++++++++--- 1 file changed, 35 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/ebd05ddb/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java index c93bf64..6d4d3a1 100644 --- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java @@ -19,10 +19,13 @@ package org.apache.cassandra.cql3.statements; import java.nio.ByteBuffer; import java.util.*; +import java.util.concurrent.TimeUnit; import com.google.common.base.Function; import com.google.common.collect.*; + import org.apache.cassandra.config.DatabaseDescriptor; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,10 +38,10 @@ import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.QueryState; import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.transport.messages.ResultMessage; +import org.apache.cassandra.utils.NoSpamLogger; /** * A <code>BATCH</code> statement parsed from a CQL query. - * */ public class BatchStatement implements CQLStatement { @@ -53,14 +56,15 @@ public class BatchStatement implements CQLStatement private final Attributes attrs; private final boolean hasConditions; private static final Logger logger = LoggerFactory.getLogger(BatchStatement.class); + private static final String unloggedBatchWarning = "Unlogged batch covering {} partition{} detected against table{} {}. You should use a logged batch for atomicity, or asynchronous writes for performance."; /** * Creates a new BatchStatement from a list of statements and a * Thrift consistency level. * - * @param type type of the batch + * @param type type of the batch * @param statements a list of UpdateStatements - * @param attrs additional attributes for statement (CL, timestamp, timeToLive) + * @param attrs additional attributes for statement (CL, timestamp, timeToLive) */ public BatchStatement(int boundTerms, Type type, List<ModificationStatement> statements, Attributes attrs) { @@ -170,13 +174,16 @@ public class BatchStatement implements CQLStatement private Collection<? extends IMutation> unzipMutations(Map<String, Map<ByteBuffer, IMutation>> mutations) { + // The case where all statement where on the same keyspace is pretty common if (mutations.size() == 1) return mutations.values().iterator().next().values(); + List<IMutation> ms = new ArrayList<>(); for (Map<ByteBuffer, IMutation> ksMap : mutations.values()) ms.addAll(ksMap.values()); + return ms; } @@ -214,7 +221,7 @@ public class BatchStatement implements CQLStatement } else { - mut = statement.cfm.isCounter() ? ((CounterMutation)mutation).getMutation() : (Mutation)mutation; + mut = statement.cfm.isCounter() ? ((CounterMutation) mutation).getMutation() : (Mutation) mutation; } statement.addUpdateForKey(mut.addOrGet(statement.cfm), key, clusteringPrefix, params); @@ -223,6 +230,7 @@ public class BatchStatement implements CQLStatement /** * Checks batch size to ensure threshold is met. If not, a warning is logged. + * * @param cfs ColumnFamilies that will store the batch's mutations. */ public static void verifyBatchSize(Iterable<ColumnFamily> cfs) @@ -237,13 +245,33 @@ public class BatchStatement implements CQLStatement { Set<String> ksCfPairs = new HashSet<>(); for (ColumnFamily cf : cfs) - ksCfPairs.add(cf.metadata().ksName + "." + cf.metadata().cfName); + ksCfPairs.add(String.format("%s.%s", cf.metadata().ksName, cf.metadata().cfName)); String format = "Batch of prepared statements for {} is of size {}, exceeding specified threshold of {} by {}."; logger.warn(format, ksCfPairs, size, warnThreshold, size - warnThreshold); } } + private void verifyBatchType(Collection<? extends IMutation> mutations) + { + if (type != Type.LOGGED && mutations.size() > 1) + { + Set<String> ksCfPairs = new HashSet<>(); + Set<ByteBuffer> keySet = new HashSet<>(); + + for (IMutation im : mutations) + { + keySet.add(im.key()); + for (ColumnFamily cf : im.getColumnFamilies()) + ksCfPairs.add(String.format("%s.%s", cf.metadata().ksName, cf.metadata().cfName)); + } + + NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.MINUTES, unloggedBatchWarning, + keySet.size(), keySet.size() == 1 ? "" : "s", + ksCfPairs.size() == 1 ? "" : "s", ksCfPairs); + } + } + public ResultMessage execute(QueryState queryState, QueryOptions options) throws RequestExecutionException, RequestValidationException { return execute(queryState, BatchQueryOptions.withoutPerStatementVariables(options)); @@ -279,7 +307,9 @@ public class BatchStatement implements CQLStatement return im.getColumnFamilies(); } })); + verifyBatchSize(cfs); + verifyBatchType(mutations); boolean mutateAtomic = (type == Type.LOGGED && mutations.size() > 1); StorageProxy.mutateWithTriggers(mutations, cl, mutateAtomic);
