Add tests for CASSANDRA-13069

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c39a85c3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c39a85c3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c39a85c3

Branch: refs/heads/master
Commit: c39a85c3e7b2869ef3fffafe1380362d6469919d
Parents: 40f1365
Author: Paulo Motta <pauloricard...@gmail.com>
Authored: Thu Aug 24 00:55:12 2017 -0500
Committer: Paulo Motta <pa...@apache.org>
Committed: Tue Sep 12 08:36:12 2017 -0500

----------------------------------------------------------------------
 byteman/fail_after_view_write.btm  |  8 +++
 byteman/fail_before_view_write.btm |  8 +++
 materialized_views_test.py         | 90 ++++++++++++++++++++++++++++++++-
 3 files changed, 104 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c39a85c3/byteman/fail_after_view_write.btm
----------------------------------------------------------------------
diff --git a/byteman/fail_after_view_write.btm 
b/byteman/fail_after_view_write.btm
new file mode 100644
index 0000000..b7f68b3
--- /dev/null
+++ b/byteman/fail_after_view_write.btm
@@ -0,0 +1,8 @@
+RULE Die before applying base mutation
+CLASS org.apache.cassandra.db.view.TableViews
+METHOD pushViewReplicaUpdates
+AT EXIT
+IF callerEquals("applyInternal")
+DO
+  throw new RuntimeException("Dummy failure");
+ENDRULE

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c39a85c3/byteman/fail_before_view_write.btm
----------------------------------------------------------------------
diff --git a/byteman/fail_before_view_write.btm 
b/byteman/fail_before_view_write.btm
new file mode 100644
index 0000000..963fc7c
--- /dev/null
+++ b/byteman/fail_before_view_write.btm
@@ -0,0 +1,8 @@
+RULE Die before applying base mutation
+CLASS org.apache.cassandra.db.view.TableViews
+METHOD pushViewReplicaUpdates
+AT ENTRY
+IF callerEquals("applyInternal")
+DO
+  throw new RuntimeException("Dummy failure");
+ENDRULE

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c39a85c3/materialized_views_test.py
----------------------------------------------------------------------
diff --git a/materialized_views_test.py b/materialized_views_test.py
index 637124d..60dea68 100644
--- a/materialized_views_test.py
+++ b/materialized_views_test.py
@@ -7,7 +7,7 @@ from functools import partial
 from multiprocessing import Process, Queue
 from unittest import skip, skipIf
 
-from cassandra import ConsistencyLevel
+from cassandra import ConsistencyLevel, WriteFailure
 from cassandra.cluster import NoHostAvailable
 from cassandra.concurrent import execute_concurrent_with_args
 from cassandra.cluster import Cluster
@@ -22,6 +22,7 @@ from dtest import Tester, debug, get_ip_from_node, create_ks
 from tools.assertions import (assert_all, assert_crc_check_chance_equal,
                               assert_invalid, assert_none, assert_one,
                               assert_unavailable)
+from tools.data import rows_to_list
 from tools.decorators import since
 from tools.misc import new_node
 from tools.jmxutils import (JolokiaAgent, make_mbean, 
remove_perf_disable_shared_mem)
@@ -124,10 +125,14 @@ class TestMaterializedViews(Tester):
         self._settle_nodes()
 
     def _replay_batchlogs(self):
-        debug("Replaying batchlog on all nodes")
         for node in self.cluster.nodelist():
             if node.is_running():
+                debug("Replaying batchlog on node {}".format(node.name))
                 node.nodetool("replaybatchlog")
+                # CASSANDRA-13069 - Ensure replayed mutations are removed from 
the batchlog
+                node_session = self.patient_exclusive_cql_connection(node)
+                result = list(node_session.execute("SELECT count(*) FROM 
system.batches;"))
+                self.assertEqual(result[0].count, 0)
 
     def create_test(self):
         """Test the materialized view creation"""
@@ -1873,6 +1878,87 @@ class TestMaterializedViews(Tester):
         # node3 should have received and ignored the creation of the MV over 
the dropped table
         self.assertTrue(node3.grep_log('Not adding view users_by_state because 
the base table'))
 
+    def base_view_consistency_on_failure_after_mv_apply_test(self):
+        self._test_base_view_consistency_on_crash("after")
+
+    def base_view_consistency_on_failure_before_mv_apply_test(self):
+        self._test_base_view_consistency_on_crash("before")
+
+    def _test_base_view_consistency_on_crash(self, fail_phase):
+        """
+         * Fails base table write before or after applying views
+         * Restart node and replay commit and batchlog
+         * Check that base and views are present
+
+         @jira_ticket CASSANDRA-13069
+        """
+
+        self.cluster.set_batch_commitlog(enabled=True)
+        self.ignore_log_patterns = [r'Dummy failure', r"Failed to 
force-recycle all segments"]
+        self.prepare(rf=1, install_byteman=True)
+        node1, node2, node3 = self.cluster.nodelist()
+        session = self.patient_exclusive_cql_connection(node1)
+        session.execute('USE ks')
+
+        session.execute("CREATE TABLE t (id int PRIMARY KEY, v int, v2 text, 
v3 decimal)")
+        session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
+                         "WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY 
(v, id)"))
+
+        session.cluster.control_connection.wait_for_schema_agreement()
+
+        debug('Make node1 fail {} view writes'.format(fail_phase))
+        
node1.byteman_submit(['./byteman/fail_{}_view_write.btm'.format(fail_phase)])
+
+        debug('Write 1000 rows - all node1 writes should fail')
+
+        failed = False
+        for i in xrange(1, 1000):
+            try:
+                session.execute("INSERT INTO t (id, v, v2, v3) VALUES ({v}, 
{v}, 'a', 3.0) USING TIMESTAMP {v}".format(v=i))
+            except WriteFailure:
+                failed = True
+
+        self.assertTrue(failed, "Should fail at least once.")
+        self.assertTrue(node1.grep_log("Dummy failure"), "Should throw Dummy 
failure")
+
+        missing_entries = 0
+        session = self.patient_exclusive_cql_connection(node1)
+        session.execute('USE ks')
+        for i in xrange(1, 1000):
+            view_entry = rows_to_list(session.execute(SimpleStatement("SELECT 
* FROM t_by_v WHERE id = {} AND v = {}".format(i, i),
+                                                      
consistency_level=ConsistencyLevel.ONE)))
+            base_entry = rows_to_list(session.execute(SimpleStatement("SELECT 
* FROM t WHERE id = {}".format(i),
+                                                      
consistency_level=ConsistencyLevel.ONE)))
+
+            if not base_entry:
+                missing_entries += 1
+            if not view_entry:
+                missing_entries += 1
+
+        debug("Missing entries {}".format(missing_entries))
+        self.assertTrue(missing_entries > 0, )
+
+        debug('Restarting node1 to ensure commit log is replayed')
+        node1.stop(wait_other_notice=True)
+        # Set batchlog.replay_timeout_seconds=1 so we can ensure batchlog will 
be replayed below
+        node1.start(jvm_args=["-Dcassandra.batchlog.replay_timeout_in_ms=1"])
+
+        debug('Replay batchlogs')
+        time.sleep(0.001)  # Wait batchlog.replay_timeout_in_ms=1 (ms)
+        self._replay_batchlogs()
+
+        debug('Verify that both the base table entry and view are present 
after commit and batchlog replay')
+        session = self.patient_exclusive_cql_connection(node1)
+        session.execute('USE ks')
+        for i in xrange(1, 1000):
+            view_entry = rows_to_list(session.execute(SimpleStatement("SELECT 
* FROM t_by_v WHERE id = {} AND v = {}".format(i, i),
+                                                      
consistency_level=ConsistencyLevel.ONE)))
+            base_entry = rows_to_list(session.execute(SimpleStatement("SELECT 
* FROM t WHERE id = {}".format(i),
+                                                      
consistency_level=ConsistencyLevel.ONE)))
+
+            self.assertTrue(base_entry, "Both base {} and view entry {} should 
exist.".format(base_entry, view_entry))
+            self.assertTrue(view_entry, "Both base {} and view entry {} should 
exist.".format(base_entry, view_entry))
+
 
 # For read verification
 class MutationPresence(Enum):


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to