Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 706ba8767 -> d0e8ba494
Validate gc_grace_seconds for batchlog writes and MVs patch by Paulo Motta; reviewed by Aleksey Yeschenko for CASSANDRA-9917 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d0e8ba49 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d0e8ba49 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d0e8ba49 Branch: refs/heads/cassandra-3.0 Commit: d0e8ba4947d3e7804421869bcd1997ca6aad3840 Parents: 706ba87 Author: Paulo Motta <[email protected]> Authored: Wed Aug 19 10:16:23 2015 -0300 Committer: Aleksey Yeschenko <[email protected]> Committed: Mon Aug 24 17:20:09 2015 +0300 ---------------------------------------------------------------------- CHANGES.txt | 4 ++ .../org/apache/cassandra/config/CFMetaData.java | 5 ++ .../AlterMaterializedViewStatement.java | 11 +++- .../cql3/statements/AlterTableStatement.java | 9 ++++ .../cql3/statements/BatchStatement.java | 55 ++++++++++++++++---- .../CreateMaterializedViewStatement.java | 13 +++++ 6 files changed, 87 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d0e8ba49/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 8d92393..930fb5a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,3 +1,7 @@ +3.0.0-beta2 + * Validate gc_grace_seconds for batchlog writes and MVs (CASSANDRA-9917) + + 3.0.0-beta1 * Redesign secondary index API (CASSANDRA-9459, 7771, 9041) * Fix throwing ReadFailure instead of ReadTimeout on range queries (CASSANDRA-10125) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d0e8ba49/src/java/org/apache/cassandra/config/CFMetaData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java index 37f1f4d..be3093d 100644 --- a/src/java/org/apache/cassandra/config/CFMetaData.java +++ b/src/java/org/apache/cassandra/config/CFMetaData.java @@ -309,6 +309,11 @@ public final class CFMetaData return materializedViews; } + public boolean hasMaterializedViews() + { + return !materializedViews.isEmpty(); + } + public Indexes getIndexes() { return indexes; http://git-wip-us.apache.org/repos/asf/cassandra/blob/d0e8ba49/src/java/org/apache/cassandra/cql3/statements/AlterMaterializedViewStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterMaterializedViewStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterMaterializedViewStatement.java index acc2f90..bc4ba11 100644 --- a/src/java/org/apache/cassandra/cql3/statements/AlterMaterializedViewStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/AlterMaterializedViewStatement.java @@ -24,6 +24,7 @@ import org.apache.cassandra.db.view.MaterializedView; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.exceptions.RequestValidationException; import org.apache.cassandra.exceptions.UnauthorizedException; +import org.apache.cassandra.schema.TableParams; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.MigrationManager; import org.apache.cassandra.transport.Event; @@ -64,7 +65,15 @@ public class AlterMaterializedViewStatement extends SchemaAlteringStatement throw new InvalidRequestException("ALTER MATERIALIZED VIEW WITH invoked, but no parameters found"); attrs.validate(); - cfm.params(attrs.asAlteredTableParams(cfm.params)); + + TableParams params = attrs.asAlteredTableParams(cfm.params); + if (params.gcGraceSeconds == 0) + { + throw new InvalidRequestException("Cannot alter gc_grace_seconds of a materialized view to 0, since this " + + "value is used to TTL undelivered updates. Setting gc_grace_seconds too " + + "low might cause undelivered updates to expire before being replayed."); + } + cfm.params(params); MigrationManager.announceColumnFamilyUpdate(cfm, false, isLocalOnly); return true; http://git-wip-us.apache.org/repos/asf/cassandra/blob/d0e8ba49/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java index da42c96..fac0c53 100644 --- a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java @@ -304,6 +304,15 @@ public class AlterTableStatement extends SchemaAlteringStatement TableParams params = attrs.asAlteredTableParams(cfm.params); + if (cfm.hasMaterializedViews() && params.gcGraceSeconds == 0) + { + throw new InvalidRequestException("Cannot alter gc_grace_seconds of the base table of a " + + "materialized view to 0, since this value is used to TTL " + + "undelivered updates. Setting gc_grace_seconds too low might " + + "cause undelivered updates to expire " + + "before being replayed."); + } + if (meta.isCounter() && params.defaultTimeToLive > 0) throw new InvalidRequestException("Cannot set default_time_to_live on a table with counters"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/d0e8ba49/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java index 5d1333c..5de4b6c 100644 --- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java @@ -40,7 +40,6 @@ import org.apache.cassandra.service.ClientWarn; import org.apache.cassandra.service.QueryState; import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.tracing.Tracing; -import org.apache.cassandra.service.StorageService; import org.apache.cassandra.transport.messages.ResultMessage; import org.apache.cassandra.utils.NoSpamLogger; import org.apache.cassandra.utils.Pair; @@ -65,7 +64,16 @@ public class BatchStatement implements CQLStatement private final Attributes attrs; private final boolean hasConditions; private static final Logger logger = LoggerFactory.getLogger(BatchStatement.class); - private static final String unloggedBatchWarning = "Unlogged batch covering {} partition{} detected against table{} {}. You should use a logged batch for atomicity, or asynchronous writes for performance."; + + private static final String UNLOGGED_BATCH_WARNING = "Unlogged batch covering {} partition{} detected " + + "against table{} {}. You should use a logged batch for " + + "atomicity, or asynchronous writes for performance."; + + private static final String LOGGED_BATCH_LOW_GCGS_WARNING = "Executing a LOGGED BATCH on table{} {}, configured with a " + + "gc_grace_seconds of 0. The gc_grace_seconds is used to TTL " + + "batchlog entries, so setting gc_grace_seconds too low on " + + "tables involved in an atomic batch might cause batchlog " + + "entries to expire before being replayed."; /** * Creates a new BatchStatement from a list of statements and a @@ -137,7 +145,8 @@ public class BatchStatement implements CQLStatement { if (hasConditions) throw new InvalidRequestException("Cannot provide custom timestamp for conditional BATCH"); - if (type == Type.COUNTER) + + if (isCounter()) throw new InvalidRequestException("Cannot provide custom timestamp for counter BATCH"); } @@ -152,10 +161,10 @@ public class BatchStatement implements CQLStatement if (timestampSet && statement.isTimestampSet()) throw new InvalidRequestException("Timestamp must be set either on BATCH or individual statements"); - if (type == Type.COUNTER && !statement.isCounter()) + if (isCounter() && !statement.isCounter()) throw new InvalidRequestException("Cannot include non-counter statement in a counter batch"); - if (type == Type.LOGGED && statement.isCounter()) + if (isLogged() && statement.isCounter()) throw new InvalidRequestException("Cannot include a counter statement in a logged batch"); if (statement.isCounter()) @@ -181,6 +190,16 @@ public class BatchStatement implements CQLStatement } } + private boolean isCounter() + { + return type == Type.COUNTER; + } + + private boolean isLogged() + { + return type == Type.LOGGED; + } + // The batch itself will be validated in either Parsed#prepare() - for regular CQL3 batches, // or in QueryProcessor.processBatch() - for native protocol batches. public void validate(ClientState state) throws InvalidRequestException @@ -197,14 +216,32 @@ public class BatchStatement implements CQLStatement private Collection<? extends IMutation> getMutations(BatchQueryOptions options, boolean local, long now) throws RequestExecutionException, RequestValidationException { + Set<String> tablesWithZeroGcGs = null; + Map<String, Map<ByteBuffer, IMutation>> mutations = new HashMap<>(); for (int i = 0; i < statements.size(); i++) { ModificationStatement statement = statements.get(i); + if (isLogged() && statement.cfm.params.gcGraceSeconds == 0) + { + if (tablesWithZeroGcGs == null) + tablesWithZeroGcGs = new HashSet<>(); + tablesWithZeroGcGs.add(String.format("%s.%s", statement.cfm.ksName, statement.cfm.cfName)); + } QueryOptions statementOptions = options.forStatement(i); long timestamp = attrs.getTimestamp(now, statementOptions); addStatementMutations(statement, statementOptions, local, timestamp, mutations); } + + if (tablesWithZeroGcGs != null) + { + String suffix = tablesWithZeroGcGs.size() == 1 ? "" : "s"; + NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.MINUTES, LOGGED_BATCH_LOW_GCGS_WARNING, + suffix, tablesWithZeroGcGs); + ClientWarn.warn(MessageFormatter.arrayFormat(LOGGED_BATCH_LOW_GCGS_WARNING, new Object[] { suffix, tablesWithZeroGcGs }) + .getMessage()); + } + return unzipMutations(mutations); } @@ -321,7 +358,7 @@ public class BatchStatement implements CQLStatement private void verifyBatchType(Iterable<PartitionUpdate> updates) { - if (type != Type.LOGGED && Iterables.size(updates) > 1) + if (!isLogged() && Iterables.size(updates) > 1) { Set<DecoratedKey> keySet = new HashSet<>(); Set<String> tableNames = new HashSet<>(); @@ -332,11 +369,11 @@ public class BatchStatement implements CQLStatement tableNames.add(String.format("%s.%s", update.metadata().ksName, update.metadata().cfName)); } - NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.MINUTES, unloggedBatchWarning, + NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.MINUTES, UNLOGGED_BATCH_WARNING, keySet.size(), keySet.size() == 1 ? "" : "s", tableNames.size() == 1 ? "" : "s", tableNames); - ClientWarn.warn(MessageFormatter.arrayFormat(unloggedBatchWarning, new Object[]{keySet.size(), keySet.size() == 1 ? "" : "s", + ClientWarn.warn(MessageFormatter.arrayFormat(UNLOGGED_BATCH_WARNING, new Object[]{keySet.size(), keySet.size() == 1 ? "" : "s", tableNames.size() == 1 ? "" : "s", tableNames}).getMessage()); } @@ -381,7 +418,7 @@ public class BatchStatement implements CQLStatement verifyBatchSize(updates); verifyBatchType(updates); - boolean mutateAtomic = (type == Type.LOGGED && mutations.size() > 1); + boolean mutateAtomic = (isLogged() && mutations.size() > 1); StorageProxy.mutateWithTriggers(mutations, cl, mutateAtomic); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d0e8ba49/src/java/org/apache/cassandra/cql3/statements/CreateMaterializedViewStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateMaterializedViewStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateMaterializedViewStatement.java index ec9e848..3e1a0bf 100644 --- a/src/java/org/apache/cassandra/cql3/statements/CreateMaterializedViewStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/CreateMaterializedViewStatement.java @@ -93,6 +93,7 @@ public class CreateMaterializedViewStatement extends SchemaAlteringStatement // - make sure that primary key does not include any collections // - make sure there is no where clause in the select statement // - make sure there is not currently a table or view + // - make sure baseTable gcGraceSeconds > 0 properties.validate(); @@ -105,11 +106,23 @@ public class CreateMaterializedViewStatement extends SchemaAlteringStatement throw new InvalidRequestException("Cannot create a materialized view on a table in a separate keyspace"); CFMetaData cfm = ThriftValidation.validateColumnFamily(baseName.getKeyspace(), baseName.getColumnFamily()); + if (cfm.isCounter()) throw new InvalidRequestException("Materialized views are not supported on counter tables"); + if (cfm.isMaterializedView()) throw new InvalidRequestException("Materialized views cannot be created against other materialized views"); + if (cfm.params.gcGraceSeconds == 0) + { + throw new InvalidRequestException(String.format("Cannot create materialized view '%s' for base table " + + "'%s' with gc_grace_seconds of 0, since this value is " + + "used to TTL undelivered updates. Setting gc_grace_seconds" + + " too low might cause undelivered updates to expire " + + "before being replayed.", cfName.getColumnFamily(), + baseName.getColumnFamily())); + } + Set<ColumnIdentifier> included = new HashSet<>(); for (RawSelector selector : selectClause) {
