Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 cab2b25b0 -> eac7781e7


Ignore Paxos commits for truncated tables

patch by Sam Tunnicliffe; reviewed by Aleksey Yeschenko for
CASSANDRA-7538


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/17de36f2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/17de36f2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/17de36f2

Branch: refs/heads/cassandra-2.1
Commit: 17de36f246c912287b85eb7015583a35f5040919
Parents: 0e3d9fc
Author: Sam Tunnicliffe <s...@beobal.com>
Authored: Mon Nov 24 16:07:17 2014 +0300
Committer: Aleksey Yeschenko <alek...@apache.org>
Committed: Mon Nov 24 16:07:17 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/service/paxos/PaxosState.java     |  17 ++-
 .../cassandra/service/PaxosStateTest.java       | 108 +++++++++++++++++++
 3 files changed, 122 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/17de36f2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 412eb59..fe23248 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.12:
+ * Ignore Paxos commits for truncated tables (CASSANDRA-7538)
  * Validate size of indexed column values (CASSANDRA-8280)
  * Make LCS split compaction results over all data directories (CASSANDRA-8329)
  * Fix some failing queries that use multi-column relations

http://git-wip-us.apache.org/repos/asf/cassandra/blob/17de36f2/src/java/org/apache/cassandra/service/paxos/PaxosState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosState.java 
b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
index 0196122..2adecec 100644
--- a/src/java/org/apache/cassandra/service/paxos/PaxosState.java
+++ b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.db.RowMutation;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.UUIDGen;
 
 public class PaxosState
 {
@@ -132,10 +133,18 @@ public class PaxosState
             // Committing it is however always safe due to column timestamps, 
so always do it. However,
             // if our current in-progress ballot is strictly greater than the 
proposal one, we shouldn't
             // erase the in-progress update.
-            Tracing.trace("Committing proposal {}", proposal);
-            RowMutation rm = proposal.makeMutation();
-            Keyspace.open(rm.getKeyspaceName()).apply(rm, true);
-
+            // The table may have been truncated since the proposal was 
initiated. In that case, we
+            // don't want to perform the mutation and potentially resurrect 
truncated data
+            if (UUIDGen.unixTimestamp(proposal.ballot) >= 
SystemKeyspace.getTruncatedAt(proposal.update.metadata().cfId))
+            {
+                Tracing.trace("Committing proposal {}", proposal);
+                RowMutation rm = proposal.makeMutation();
+                Keyspace.open(rm.getKeyspaceName()).apply(rm, true);
+            }
+            else
+            {
+                Tracing.trace("Not committing proposal {} as ballot timestamp 
predates last truncation time", proposal);
+            }
             // We don't need to lock, we're just blindly updating
             SystemKeyspace.savePaxosCommit(proposal);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/17de36f2/test/unit/org/apache/cassandra/service/PaxosStateTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/PaxosStateTest.java 
b/test/unit/org/apache/cassandra/service/PaxosStateTest.java
new file mode 100644
index 0000000..306c424
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/PaxosStateTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import java.nio.ByteBuffer;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.ArrayBackedSortedColumns;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.service.paxos.Commit;
+import org.apache.cassandra.service.paxos.PaxosState;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+
+public class PaxosStateTest
+{
+    @BeforeClass
+    public static void setUpClass() throws Throwable
+    {
+        SchemaLoader.loadSchema();
+    }
+
+    @AfterClass
+    public static void stopGossiper()
+    {
+        Gossiper.instance.stop();
+    }
+
+    @Test
+    public void testCommittingAfterTruncation() throws Exception
+    {
+        ColumnFamilyStore cfs = 
Keyspace.open("Keyspace1").getColumnFamilyStore("Standard1");
+        DecoratedKey key = Util.dk("key" + System.nanoTime());
+        ByteBuffer name = ByteBufferUtil.bytes("col");
+        ByteBuffer value = ByteBufferUtil.bytes(0);
+        ColumnFamily update = 
ArrayBackedSortedColumns.factory.create(cfs.metadata);
+        update.addColumn(name, value, FBUtilities.timestampMicros());
+
+        // CFS should be empty initially
+        assertNoDataPresent(cfs, key);
+
+        // Commit the proposal & verify the data is present
+        Commit beforeTruncate = newProposal(0, key.key, update);
+        PaxosState.commit(beforeTruncate);
+        assertDataPresent(cfs, key, name, value);
+
+        // Truncate then attempt to commit again, mutation should
+        // be ignored as the proposal predates the truncation
+        cfs.truncateBlocking();
+        PaxosState.commit(beforeTruncate);
+        assertNoDataPresent(cfs, key);
+
+        // Now try again with a ballot created after the truncation
+        long timestamp = SystemKeyspace.getTruncatedAt(update.metadata().cfId) 
+ 1;
+        Commit afterTruncate = newProposal(timestamp, key.key, update);
+        PaxosState.commit(afterTruncate);
+        assertDataPresent(cfs, key, name, value);
+    }
+
+    private Commit newProposal(long ballotMillis, ByteBuffer key, ColumnFamily 
update)
+    {
+        return Commit.newProposal(key, UUIDGen.getTimeUUID(ballotMillis), 
update);
+    }
+
+    private void assertDataPresent(ColumnFamilyStore cfs, DecoratedKey key, 
ByteBuffer name, ByteBuffer value)
+    {
+        ColumnFamily cf = 
cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, cfs.name, 
System.currentTimeMillis()));
+        assertFalse(cf.isEmpty());
+        assertEquals(0, ByteBufferUtil.compareUnsigned(value, 
cf.getColumn(name).value()));
+    }
+
+    private void assertNoDataPresent(ColumnFamilyStore cfs, DecoratedKey key)
+    {
+        ColumnFamily cf = 
cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, cfs.name, 
System.currentTimeMillis()));
+        assertNull(cf);
+    }
+}

Reply via email to