Add tests for CASSANDRA-13069
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c39a85c3 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c39a85c3 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c39a85c3 Branch: refs/heads/master Commit: c39a85c3e7b2869ef3fffafe1380362d6469919d Parents: 40f1365 Author: Paulo Motta <pauloricard...@gmail.com> Authored: Thu Aug 24 00:55:12 2017 -0500 Committer: Paulo Motta <pa...@apache.org> Committed: Tue Sep 12 08:36:12 2017 -0500 ---------------------------------------------------------------------- byteman/fail_after_view_write.btm | 8 +++ byteman/fail_before_view_write.btm | 8 +++ materialized_views_test.py | 90 ++++++++++++++++++++++++++++++++- 3 files changed, 104 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c39a85c3/byteman/fail_after_view_write.btm ---------------------------------------------------------------------- diff --git a/byteman/fail_after_view_write.btm b/byteman/fail_after_view_write.btm new file mode 100644 index 0000000..b7f68b3 --- /dev/null +++ b/byteman/fail_after_view_write.btm @@ -0,0 +1,8 @@ +RULE Die before applying base mutation +CLASS org.apache.cassandra.db.view.TableViews +METHOD pushViewReplicaUpdates +AT EXIT +IF callerEquals("applyInternal") +DO + throw new RuntimeException("Dummy failure"); +ENDRULE http://git-wip-us.apache.org/repos/asf/cassandra/blob/c39a85c3/byteman/fail_before_view_write.btm ---------------------------------------------------------------------- diff --git a/byteman/fail_before_view_write.btm b/byteman/fail_before_view_write.btm new file mode 100644 index 0000000..963fc7c --- /dev/null +++ b/byteman/fail_before_view_write.btm @@ -0,0 +1,8 @@ +RULE Die before applying base mutation +CLASS org.apache.cassandra.db.view.TableViews +METHOD pushViewReplicaUpdates +AT ENTRY +IF callerEquals("applyInternal") +DO + throw new RuntimeException("Dummy failure"); +ENDRULE http://git-wip-us.apache.org/repos/asf/cassandra/blob/c39a85c3/materialized_views_test.py ---------------------------------------------------------------------- diff --git a/materialized_views_test.py b/materialized_views_test.py index 637124d..60dea68 100644 --- a/materialized_views_test.py +++ b/materialized_views_test.py @@ -7,7 +7,7 @@ from functools import partial from multiprocessing import Process, Queue from unittest import skip, skipIf -from cassandra import ConsistencyLevel +from cassandra import ConsistencyLevel, WriteFailure from cassandra.cluster import NoHostAvailable from cassandra.concurrent import execute_concurrent_with_args from cassandra.cluster import Cluster @@ -22,6 +22,7 @@ from dtest import Tester, debug, get_ip_from_node, create_ks from tools.assertions import (assert_all, assert_crc_check_chance_equal, assert_invalid, assert_none, assert_one, assert_unavailable) +from tools.data import rows_to_list from tools.decorators import since from tools.misc import new_node from tools.jmxutils import (JolokiaAgent, make_mbean, remove_perf_disable_shared_mem) @@ -124,10 +125,14 @@ class TestMaterializedViews(Tester): self._settle_nodes() def _replay_batchlogs(self): - debug("Replaying batchlog on all nodes") for node in self.cluster.nodelist(): if node.is_running(): + debug("Replaying batchlog on node {}".format(node.name)) node.nodetool("replaybatchlog") + # CASSANDRA-13069 - Ensure replayed mutations are removed from the batchlog + node_session = self.patient_exclusive_cql_connection(node) + result = list(node_session.execute("SELECT count(*) FROM system.batches;")) + self.assertEqual(result[0].count, 0) def create_test(self): """Test the materialized view creation""" @@ -1873,6 +1878,87 @@ class TestMaterializedViews(Tester): # node3 should have received and ignored the creation of the MV over the dropped table self.assertTrue(node3.grep_log('Not adding view users_by_state because the base table')) + def base_view_consistency_on_failure_after_mv_apply_test(self): + self._test_base_view_consistency_on_crash("after") + + def base_view_consistency_on_failure_before_mv_apply_test(self): + self._test_base_view_consistency_on_crash("before") + + def _test_base_view_consistency_on_crash(self, fail_phase): + """ + * Fails base table write before or after applying views + * Restart node and replay commit and batchlog + * Check that base and views are present + + @jira_ticket CASSANDRA-13069 + """ + + self.cluster.set_batch_commitlog(enabled=True) + self.ignore_log_patterns = [r'Dummy failure', r"Failed to force-recycle all segments"] + self.prepare(rf=1, install_byteman=True) + node1, node2, node3 = self.cluster.nodelist() + session = self.patient_exclusive_cql_connection(node1) + session.execute('USE ks') + + session.execute("CREATE TABLE t (id int PRIMARY KEY, v int, v2 text, v3 decimal)") + session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t " + "WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v, id)")) + + session.cluster.control_connection.wait_for_schema_agreement() + + debug('Make node1 fail {} view writes'.format(fail_phase)) + node1.byteman_submit(['./byteman/fail_{}_view_write.btm'.format(fail_phase)]) + + debug('Write 1000 rows - all node1 writes should fail') + + failed = False + for i in xrange(1, 1000): + try: + session.execute("INSERT INTO t (id, v, v2, v3) VALUES ({v}, {v}, 'a', 3.0) USING TIMESTAMP {v}".format(v=i)) + except WriteFailure: + failed = True + + self.assertTrue(failed, "Should fail at least once.") + self.assertTrue(node1.grep_log("Dummy failure"), "Should throw Dummy failure") + + missing_entries = 0 + session = self.patient_exclusive_cql_connection(node1) + session.execute('USE ks') + for i in xrange(1, 1000): + view_entry = rows_to_list(session.execute(SimpleStatement("SELECT * FROM t_by_v WHERE id = {} AND v = {}".format(i, i), + consistency_level=ConsistencyLevel.ONE))) + base_entry = rows_to_list(session.execute(SimpleStatement("SELECT * FROM t WHERE id = {}".format(i), + consistency_level=ConsistencyLevel.ONE))) + + if not base_entry: + missing_entries += 1 + if not view_entry: + missing_entries += 1 + + debug("Missing entries {}".format(missing_entries)) + self.assertTrue(missing_entries > 0, ) + + debug('Restarting node1 to ensure commit log is replayed') + node1.stop(wait_other_notice=True) + # Set batchlog.replay_timeout_seconds=1 so we can ensure batchlog will be replayed below + node1.start(jvm_args=["-Dcassandra.batchlog.replay_timeout_in_ms=1"]) + + debug('Replay batchlogs') + time.sleep(0.001) # Wait batchlog.replay_timeout_in_ms=1 (ms) + self._replay_batchlogs() + + debug('Verify that both the base table entry and view are present after commit and batchlog replay') + session = self.patient_exclusive_cql_connection(node1) + session.execute('USE ks') + for i in xrange(1, 1000): + view_entry = rows_to_list(session.execute(SimpleStatement("SELECT * FROM t_by_v WHERE id = {} AND v = {}".format(i, i), + consistency_level=ConsistencyLevel.ONE))) + base_entry = rows_to_list(session.execute(SimpleStatement("SELECT * FROM t WHERE id = {}".format(i), + consistency_level=ConsistencyLevel.ONE))) + + self.assertTrue(base_entry, "Both base {} and view entry {} should exist.".format(base_entry, view_entry)) + self.assertTrue(view_entry, "Both base {} and view entry {} should exist.".format(base_entry, view_entry)) + # For read verification class MutationPresence(Enum): --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org