Repository: cassandra Updated Branches: refs/heads/trunk 6b849da26 -> ada8d8b60
Fail on very large batch sizes patch by Carl Yeksigian; reviewed by Sankalp Kohli for CASSANDRA-8011 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ada8d8b6 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ada8d8b6 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ada8d8b6 Branch: refs/heads/trunk Commit: ada8d8b60b61653112073538a6335bb30027838a Parents: 6b849da Author: Jonathan Ellis <[email protected]> Authored: Wed Oct 1 12:08:29 2014 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Wed Oct 1 12:08:29 2014 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 2 ++ conf/cassandra.yaml | 3 +++ .../org/apache/cassandra/config/Config.java | 1 + .../cassandra/config/DatabaseDescriptor.java | 20 ++++++++++++++++++++ .../cql3/statements/BatchStatement.java | 17 ++++++++++++++--- .../cassandra/service/StorageService.java | 10 ++++++++++ .../cassandra/service/StorageServiceMBean.java | 5 +++++ .../org/apache/cassandra/cql3/BatchTests.java | 12 ++++++++++++ 9 files changed, 68 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/ada8d8b6/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6720cc8..682f5d3 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -24,6 +24,7 @@ * Shorten SSTable path (CASSANDRA-6962) * Use unsafe mutations for most unit tests (CASSANDRA-6969) * Fix race condition during calculation of pending ranges (CASSANDRA-7390) + * Fail on very large batch sizes (CASSANDRA-8011) 2.1.1 http://git-wip-us.apache.org/repos/asf/cassandra/blob/ada8d8b6/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index 96f2c05..74ebb41 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -30,6 +30,8 @@ Upgrading GossipingPropertyFileSnitch instead. - CQL2 has been removed entirely in this release (previously deprecated in 2.0.0). Please switch to CQL3 if you haven't already done so. + - Very large batches will now be rejected (defaults to 50kb). This + can be customized by modifying batch_size_fail_threshold_in_kb. 2.1 http://git-wip-us.apache.org/repos/asf/cassandra/blob/ada8d8b6/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index 8711625..137dc14 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -514,6 +514,9 @@ column_index_size_in_kb: 64 # Caution should be taken on increasing the size of this threshold as it can lead to node instability. batch_size_warn_threshold_in_kb: 5 +# Fail any batch exceeding this value. 50kb (10x warn threshold) by default. +batch_size_fail_threshold_in_kb: 50 + # Number of simultaneous compactions to allow, NOT including # validation "compactions" for anti-entropy repair. Simultaneous # compactions can help preserve read performance in a mixed read/write http://git-wip-us.apache.org/repos/asf/cassandra/blob/ada8d8b6/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index bb07449..0f525cd 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -132,6 +132,7 @@ public class Config /* if the size of columns or super-columns are more than this, indexing will kick in */ public Integer column_index_size_in_kb = 64; public Integer batch_size_warn_threshold_in_kb = 5; + public volatile Integer batch_size_fail_threshold_in_kb = 50; public Integer concurrent_compactors; public volatile Integer compaction_throughput_mb_per_sec = 16; http://git-wip-us.apache.org/repos/asf/cassandra/blob/ada8d8b6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 0310537..64d7ec8 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -619,6 +619,11 @@ public class DatabaseDescriptor } if (seedProvider.getSeeds().size() == 0) throw new ConfigurationException("The seed provider lists no seeds."); + + if (conf.batch_size_fail_threshold_in_kb == null) + { + conf.batch_size_fail_threshold_in_kb = conf.batch_size_warn_threshold_in_kb * 10; + } } private static IEndpointSnitch createEndpointSnitch(String snitchClassName) throws ConfigurationException @@ -791,6 +796,21 @@ public class DatabaseDescriptor return conf.batch_size_warn_threshold_in_kb * 1024; } + public static long getBatchSizeFailThreshold() + { + return conf.batch_size_fail_threshold_in_kb * 1024L; + } + + public static int getBatchSizeFailThresholdInKB() + { + return conf.batch_size_fail_threshold_in_kb; + } + + public static void setBatchSizeFailThresholdInKB(int threshold) + { + conf.batch_size_fail_threshold_in_kb = threshold; + } + public static Collection<String> getInitialTokens() { return tokensFromString(System.getProperty("cassandra.initial_token", conf.initial_token)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ada8d8b6/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java index 17d1771..8d380dc 100644 --- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java @@ -23,6 +23,7 @@ import java.util.*; import com.google.common.base.Function; import com.google.common.collect.*; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.tracing.Tracing; import org.github.jamm.MemoryMeter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -237,10 +238,11 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache * Checks batch size to ensure threshold is met. If not, a warning is logged. * @param cfs ColumnFamilies that will store the batch's mutations. */ - public static void verifyBatchSize(Iterable<ColumnFamily> cfs) + public static void verifyBatchSize(Iterable<ColumnFamily> cfs) throws InvalidRequestException { long size = 0; long warnThreshold = DatabaseDescriptor.getBatchSizeWarnThreshold(); + long failThreshold = DatabaseDescriptor.getBatchSizeFailThreshold(); for (ColumnFamily cf : cfs) size += cf.dataSize(); @@ -251,8 +253,17 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache for (ColumnFamily cf : cfs) ksCfPairs.add(cf.metadata().ksName + "." + cf.metadata().cfName); - String format = "Batch of prepared statements for {} is of size {}, exceeding specified threshold of {} by {}."; - logger.warn(format, ksCfPairs, size, warnThreshold, size - warnThreshold); + String format = "Batch of prepared statements for {} is of size {}, exceeding specified threshold of {} by {}.{}"; + if (size > failThreshold) + { + Tracing.trace(format, new Object[] {ksCfPairs, size, failThreshold, size - failThreshold, " (see batch_size_fail_threshold_in_kb)"}); + logger.error(format, ksCfPairs, size, failThreshold, size - failThreshold, " (see batch_size_fail_threshold_in_kb)"); + throw new InvalidRequestException(String.format("Batch too large")); + } + else if (logger.isWarnEnabled()) + { + logger.warn(format, ksCfPairs, size, warnThreshold, size - warnThreshold, ""); + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ada8d8b6/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index c114863..af5e975 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -4054,6 +4054,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE DatabaseDescriptor.setTombstoneFailureThreshold(threshold); } + public int getBatchSizeFailureThreshold() + { + return DatabaseDescriptor.getBatchSizeFailThresholdInKB(); + } + + public void setBatchSizeFailureThreshold(int threshold) + { + DatabaseDescriptor.setBatchSizeFailThresholdInKB(threshold); + } + public void setHintedHandoffThrottleInKB(int throttleInKB) { DatabaseDescriptor.setHintedHandoffThrottleInKB(throttleInKB); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ada8d8b6/src/java/org/apache/cassandra/service/StorageServiceMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index 1331b50..cc54639 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -512,6 +512,11 @@ public interface StorageServiceMBean extends NotificationEmitter /** Sets the threshold for abandoning queries with many tombstones */ public void setTombstoneFailureThreshold(int tombstoneDebugThreshold); + /** Returns the threshold for rejecting queries due to a large batch size */ + public int getBatchSizeFailureThreshold(); + /** Sets the threshold for rejecting queries due to a large batch size */ + public void setBatchSizeFailureThreshold(int batchSizeDebugThreshold); + /** Sets the hinted handoff throttle in kb per second, per delivery thread. */ public void setHintedHandoffThrottleInKB(int throttleInKB); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ada8d8b6/test/unit/org/apache/cassandra/cql3/BatchTests.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/BatchTests.java b/test/unit/org/apache/cassandra/cql3/BatchTests.java index 27d407e..4905233 100644 --- a/test/unit/org/apache/cassandra/cql3/BatchTests.java +++ b/test/unit/org/apache/cassandra/cql3/BatchTests.java @@ -122,6 +122,18 @@ public class BatchTests sendBatch(BatchStatement.Type.LOGGED, true, false); } + @Test(expected = InvalidQueryException.class) + public void testOversizedBatch() + { + int SIZE_FOR_FAILURE = 2500; + BatchStatement b = new BatchStatement(BatchStatement.Type.UNLOGGED); + for (int i = 0; i < SIZE_FOR_FAILURE; i++) + { + b.add(noncounter.bind(i, "foobar")); + } + session.execute(b); + } + public void sendBatch(BatchStatement.Type type, boolean addCounter, boolean addNonCounter)
