Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 706ba8767 -> d0e8ba494


Validate gc_grace_seconds for batchlog writes and MVs

patch by Paulo Motta; reviewed by Aleksey Yeschenko for CASSANDRA-9917


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d0e8ba49
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d0e8ba49
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d0e8ba49

Branch: refs/heads/cassandra-3.0
Commit: d0e8ba4947d3e7804421869bcd1997ca6aad3840
Parents: 706ba87
Author: Paulo Motta <[email protected]>
Authored: Wed Aug 19 10:16:23 2015 -0300
Committer: Aleksey Yeschenko <[email protected]>
Committed: Mon Aug 24 17:20:09 2015 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  4 ++
 .../org/apache/cassandra/config/CFMetaData.java |  5 ++
 .../AlterMaterializedViewStatement.java         | 11 +++-
 .../cql3/statements/AlterTableStatement.java    |  9 ++++
 .../cql3/statements/BatchStatement.java         | 55 ++++++++++++++++----
 .../CreateMaterializedViewStatement.java        | 13 +++++
 6 files changed, 87 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d0e8ba49/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8d92393..930fb5a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,3 +1,7 @@
+3.0.0-beta2
+ * Validate gc_grace_seconds for batchlog writes and MVs (CASSANDRA-9917)
+
+
 3.0.0-beta1
  * Redesign secondary index API (CASSANDRA-9459, 7771, 9041)
  * Fix throwing ReadFailure instead of ReadTimeout on range queries 
(CASSANDRA-10125)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d0e8ba49/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java 
b/src/java/org/apache/cassandra/config/CFMetaData.java
index 37f1f4d..be3093d 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -309,6 +309,11 @@ public final class CFMetaData
         return materializedViews;
     }
 
+    public boolean hasMaterializedViews()
+    {
+        return !materializedViews.isEmpty();
+    }
+
     public Indexes getIndexes()
     {
         return indexes;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d0e8ba49/src/java/org/apache/cassandra/cql3/statements/AlterMaterializedViewStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/AlterMaterializedViewStatement.java
 
b/src/java/org/apache/cassandra/cql3/statements/AlterMaterializedViewStatement.java
index acc2f90..bc4ba11 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/AlterMaterializedViewStatement.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/AlterMaterializedViewStatement.java
@@ -24,6 +24,7 @@ import org.apache.cassandra.db.view.MaterializedView;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.RequestValidationException;
 import org.apache.cassandra.exceptions.UnauthorizedException;
+import org.apache.cassandra.schema.TableParams;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.MigrationManager;
 import org.apache.cassandra.transport.Event;
@@ -64,7 +65,15 @@ public class AlterMaterializedViewStatement extends 
SchemaAlteringStatement
             throw new InvalidRequestException("ALTER MATERIALIZED VIEW WITH 
invoked, but no parameters found");
 
         attrs.validate();
-        cfm.params(attrs.asAlteredTableParams(cfm.params));
+
+        TableParams params = attrs.asAlteredTableParams(cfm.params);
+        if (params.gcGraceSeconds == 0)
+        {
+            throw new InvalidRequestException("Cannot alter gc_grace_seconds 
of a materialized view to 0, since this " +
+                                              "value is used to TTL 
undelivered updates. Setting gc_grace_seconds too " +
+                                              "low might cause undelivered 
updates to expire before being replayed.");
+        }
+        cfm.params(params);
 
         MigrationManager.announceColumnFamilyUpdate(cfm, false, isLocalOnly);
         return true;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d0e8ba49/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
index da42c96..fac0c53 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
@@ -304,6 +304,15 @@ public class AlterTableStatement extends 
SchemaAlteringStatement
 
                 TableParams params = attrs.asAlteredTableParams(cfm.params);
 
+                if (cfm.hasMaterializedViews() && params.gcGraceSeconds == 0)
+                {
+                    throw new InvalidRequestException("Cannot alter 
gc_grace_seconds of the base table of a " +
+                                                      "materialized view to 0, 
since this value is used to TTL " +
+                                                      "undelivered updates. 
Setting gc_grace_seconds too low might " +
+                                                      "cause undelivered 
updates to expire " +
+                                                      "before being 
replayed.");
+                }
+
                 if (meta.isCounter() && params.defaultTimeToLive > 0)
                     throw new InvalidRequestException("Cannot set 
default_time_to_live on a table with counters");
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d0e8ba49/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 5d1333c..5de4b6c 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -40,7 +40,6 @@ import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.utils.NoSpamLogger;
 import org.apache.cassandra.utils.Pair;
@@ -65,7 +64,16 @@ public class BatchStatement implements CQLStatement
     private final Attributes attrs;
     private final boolean hasConditions;
     private static final Logger logger = 
LoggerFactory.getLogger(BatchStatement.class);
-    private static final String unloggedBatchWarning = "Unlogged batch 
covering {} partition{} detected against table{} {}. You should use a logged 
batch for atomicity, or asynchronous writes for performance.";
+
+    private static final String UNLOGGED_BATCH_WARNING = "Unlogged batch 
covering {} partition{} detected " +
+                                                         "against table{} {}. 
You should use a logged batch for " +
+                                                         "atomicity, or 
asynchronous writes for performance.";
+
+    private static final String LOGGED_BATCH_LOW_GCGS_WARNING = "Executing a 
LOGGED BATCH on table{} {}, configured with a " +
+                                                                
"gc_grace_seconds of 0. The gc_grace_seconds is used to TTL " +
+                                                                "batchlog 
entries, so setting gc_grace_seconds too low on " +
+                                                                "tables 
involved in an atomic batch might cause batchlog " +
+                                                                "entries to 
expire before being replayed.";
 
     /**
      * Creates a new BatchStatement from a list of statements and a
@@ -137,7 +145,8 @@ public class BatchStatement implements CQLStatement
         {
             if (hasConditions)
                 throw new InvalidRequestException("Cannot provide custom 
timestamp for conditional BATCH");
-            if (type == Type.COUNTER)
+
+            if (isCounter())
                 throw new InvalidRequestException("Cannot provide custom 
timestamp for counter BATCH");
         }
 
@@ -152,10 +161,10 @@ public class BatchStatement implements CQLStatement
             if (timestampSet && statement.isTimestampSet())
                 throw new InvalidRequestException("Timestamp must be set 
either on BATCH or individual statements");
 
-            if (type == Type.COUNTER && !statement.isCounter())
+            if (isCounter() && !statement.isCounter())
                 throw new InvalidRequestException("Cannot include non-counter 
statement in a counter batch");
 
-            if (type == Type.LOGGED && statement.isCounter())
+            if (isLogged() && statement.isCounter())
                 throw new InvalidRequestException("Cannot include a counter 
statement in a logged batch");
 
             if (statement.isCounter())
@@ -181,6 +190,16 @@ public class BatchStatement implements CQLStatement
         }
     }
 
+    private boolean isCounter()
+    {
+        return type == Type.COUNTER;
+    }
+
+    private boolean isLogged()
+    {
+        return type == Type.LOGGED;
+    }
+
     // The batch itself will be validated in either Parsed#prepare() - for 
regular CQL3 batches,
     //   or in QueryProcessor.processBatch() - for native protocol batches.
     public void validate(ClientState state) throws InvalidRequestException
@@ -197,14 +216,32 @@ public class BatchStatement implements CQLStatement
     private Collection<? extends IMutation> getMutations(BatchQueryOptions 
options, boolean local, long now)
     throws RequestExecutionException, RequestValidationException
     {
+        Set<String> tablesWithZeroGcGs = null;
+
         Map<String, Map<ByteBuffer, IMutation>> mutations = new HashMap<>();
         for (int i = 0; i < statements.size(); i++)
         {
             ModificationStatement statement = statements.get(i);
+            if (isLogged() && statement.cfm.params.gcGraceSeconds == 0)
+            {
+                if (tablesWithZeroGcGs == null)
+                    tablesWithZeroGcGs = new HashSet<>();
+                tablesWithZeroGcGs.add(String.format("%s.%s", 
statement.cfm.ksName, statement.cfm.cfName));
+            }
             QueryOptions statementOptions = options.forStatement(i);
             long timestamp = attrs.getTimestamp(now, statementOptions);
             addStatementMutations(statement, statementOptions, local, 
timestamp, mutations);
         }
+
+        if (tablesWithZeroGcGs != null)
+        {
+            String suffix = tablesWithZeroGcGs.size() == 1 ? "" : "s";
+            NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, 
TimeUnit.MINUTES, LOGGED_BATCH_LOW_GCGS_WARNING,
+                             suffix, tablesWithZeroGcGs);
+            
ClientWarn.warn(MessageFormatter.arrayFormat(LOGGED_BATCH_LOW_GCGS_WARNING, new 
Object[] { suffix, tablesWithZeroGcGs })
+                                            .getMessage());
+        }
+
         return unzipMutations(mutations);
     }
 
@@ -321,7 +358,7 @@ public class BatchStatement implements CQLStatement
 
     private void verifyBatchType(Iterable<PartitionUpdate> updates)
     {
-        if (type != Type.LOGGED && Iterables.size(updates) > 1)
+        if (!isLogged() && Iterables.size(updates) > 1)
         {
             Set<DecoratedKey> keySet = new HashSet<>();
             Set<String> tableNames = new HashSet<>();
@@ -332,11 +369,11 @@ public class BatchStatement implements CQLStatement
                 tableNames.add(String.format("%s.%s", 
update.metadata().ksName, update.metadata().cfName));
             }
 
-            NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, 
TimeUnit.MINUTES, unloggedBatchWarning,
+            NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, 
TimeUnit.MINUTES, UNLOGGED_BATCH_WARNING,
                              keySet.size(), keySet.size() == 1 ? "" : "s",
                              tableNames.size() == 1 ? "" : "s", tableNames);
 
-            ClientWarn.warn(MessageFormatter.arrayFormat(unloggedBatchWarning, 
new Object[]{keySet.size(), keySet.size() == 1 ? "" : "s",
+            
ClientWarn.warn(MessageFormatter.arrayFormat(UNLOGGED_BATCH_WARNING, new 
Object[]{keySet.size(), keySet.size() == 1 ? "" : "s",
                                                     tableNames.size() == 1 ? 
"" : "s", tableNames}).getMessage());
 
         }
@@ -381,7 +418,7 @@ public class BatchStatement implements CQLStatement
         verifyBatchSize(updates);
         verifyBatchType(updates);
 
-        boolean mutateAtomic = (type == Type.LOGGED && mutations.size() > 1);
+        boolean mutateAtomic = (isLogged() && mutations.size() > 1);
         StorageProxy.mutateWithTriggers(mutations, cl, mutateAtomic);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d0e8ba49/src/java/org/apache/cassandra/cql3/statements/CreateMaterializedViewStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/CreateMaterializedViewStatement.java
 
b/src/java/org/apache/cassandra/cql3/statements/CreateMaterializedViewStatement.java
index ec9e848..3e1a0bf 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/CreateMaterializedViewStatement.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/CreateMaterializedViewStatement.java
@@ -93,6 +93,7 @@ public class CreateMaterializedViewStatement extends 
SchemaAlteringStatement
         //  - make sure that primary key does not include any collections
         //  - make sure there is no where clause in the select statement
         //  - make sure there is not currently a table or view
+        //  - make sure baseTable gcGraceSeconds > 0
 
         properties.validate();
 
@@ -105,11 +106,23 @@ public class CreateMaterializedViewStatement extends 
SchemaAlteringStatement
             throw new InvalidRequestException("Cannot create a materialized 
view on a table in a separate keyspace");
 
         CFMetaData cfm = 
ThriftValidation.validateColumnFamily(baseName.getKeyspace(), 
baseName.getColumnFamily());
+
         if (cfm.isCounter())
             throw new InvalidRequestException("Materialized views are not 
supported on counter tables");
+
         if (cfm.isMaterializedView())
             throw new InvalidRequestException("Materialized views cannot be 
created against other materialized views");
 
+        if (cfm.params.gcGraceSeconds == 0)
+        {
+            throw new InvalidRequestException(String.format("Cannot create 
materialized view '%s' for base table " +
+                                                            "'%s' with 
gc_grace_seconds of 0, since this value is " +
+                                                            "used to TTL 
undelivered updates. Setting gc_grace_seconds" +
+                                                            " too low might 
cause undelivered updates to expire " +
+                                                            "before being 
replayed.", cfName.getColumnFamily(),
+                                                            
baseName.getColumnFamily()));
+        }
+
         Set<ColumnIdentifier> included = new HashSet<>();
         for (RawSelector selector : selectClause)
         {

Reply via email to