Repository: cassandra-dtest Updated Branches: refs/heads/master 97529ccfb -> a5df23d10
Add intial tests for CASSANDRA-14145 Patch by Marcus Eriksson & Sam Tunnicliffe; reviewed by Jordan West for CASSANDRA-14145 closes #37 Project: http://git-wip-us.apache.org/repos/asf/cassandra-dtest/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra-dtest/commit/a5df23d1 Tree: http://git-wip-us.apache.org/repos/asf/cassandra-dtest/tree/a5df23d1 Diff: http://git-wip-us.apache.org/repos/asf/cassandra-dtest/diff/a5df23d1 Branch: refs/heads/master Commit: a5df23d10af2ecdcc76ebe36649e19c93da830b6 Parents: 97529cc Author: Marcus Eriksson <[email protected]> Authored: Thu Aug 23 20:40:43 2018 +0200 Committer: Sam Tunnicliffe <[email protected]> Committed: Fri Sep 21 08:46:27 2018 +0100 ---------------------------------------------------------------------- dtest_setup.py | 9 +- repair_tests/incremental_repair_test.py | 212 +++++++++++++++++++++++++++ 2 files changed, 219 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/a5df23d1/dtest_setup.py ---------------------------------------------------------------------- diff --git a/dtest_setup.py b/dtest_setup.py index b8e1b23..756f542 100644 --- a/dtest_setup.py +++ b/dtest_setup.py @@ -391,11 +391,16 @@ class DTestSetup: # the failure detector can be quite slow in such tests with quick start/stop phi_values = {'phi_convict_threshold': 5} + # enable read time tracking of repaired data between replicas by default + repaired_data_tracking_values = {'repaired_data_tracking_for_partition_reads_enabled': 'true', + 'repaired_data_tracking_for_range_reads_enabled': 'true', + 'report_unconfirmed_repaired_data_mismatches': 'true'} + timeout = 15000 if self.cluster_options is not None and len(self.cluster_options) > 0: - values = merge_dicts(self.cluster_options, phi_values) + values = merge_dicts(self.cluster_options, phi_values, repaired_data_tracking_values) else: - values = merge_dicts(phi_values, { + values = merge_dicts(phi_values, repaired_data_tracking_values, { 'read_request_timeout_in_ms': timeout, 'range_request_timeout_in_ms': timeout, 'write_request_timeout_in_ms': timeout, http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/a5df23d1/repair_tests/incremental_repair_test.py ---------------------------------------------------------------------- diff --git a/repair_tests/incremental_repair_test.py b/repair_tests/incremental_repair_test.py index a4fa5a9..42c7705 100644 --- a/repair_tests/incremental_repair_test.py +++ b/repair_tests/incremental_repair_test.py @@ -18,6 +18,7 @@ from dtest import Tester, create_ks, create_cf from tools.assertions import assert_almost_equal, assert_one from tools.data import insert_c1c2 from tools.misc import new_node, ImmutableMapping +from tools.jmxutils import make_mbean, JolokiaAgent, remove_perf_disable_shared_mem since = pytest.mark.since logger = logging.getLogger(__name__) @@ -207,6 +208,7 @@ class TestIncRepair(Tester): self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false', 'num_tokens': 1, 'commitlog_sync_period_in_ms': 500}) + self.init_default_config() self.cluster.populate(3).start() node1, node2, node3 = self.cluster.nodelist() @@ -246,6 +248,7 @@ class TestIncRepair(Tester): 'num_tokens': 1, 'commitlog_sync_period_in_ms': 500}) + self.init_default_config() self.cluster.populate(3).start() node1, node2, node3 = self.cluster.nodelist() @@ -289,6 +292,7 @@ class TestIncRepair(Tester): self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false', 'num_tokens': 1, 'commitlog_sync_period_in_ms': 500}) + self.init_default_config() self.cluster.populate(3).start() node1, node2, node3 = self.cluster.nodelist() @@ -332,6 +336,7 @@ class TestIncRepair(Tester): """ # hinted handoff can create SSTable that we don't need after node3 restarted self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false'}) + self.init_default_config() self.cluster.populate(3).start() node1, node2, node3 = self.cluster.nodelist() @@ -459,6 +464,7 @@ class TestIncRepair(Tester): * Verify repairs occurred and repairedAt was updated """ self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false'}) + self.init_default_config() self.cluster.populate(2).start() node1, node2 = self.cluster.nodelist() node1.stress(['write', 'n=10K', 'no-warmup', '-schema', 'replication(factor=2)', 'compaction(strategy=SizeTieredCompactionStrategy,enabled=false)', '-rate', 'threads=50']) @@ -691,6 +697,7 @@ class TestIncRepair(Tester): """ Test repaired data remains in sync after a move """ self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false', 'commitlog_sync_period_in_ms': 500}) + self.init_default_config() self.cluster.populate(4, tokens=[0, 2**32, 2**48, -(2**32)]).start() node1, node2, node3, node4 = self.cluster.nodelist() @@ -727,6 +734,7 @@ class TestIncRepair(Tester): """ Test repaired data remains in sync after a decommission """ self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false', 'commitlog_sync_period_in_ms': 500}) + self.init_default_config() self.cluster.populate(4).start() node1, node2, node3, node4 = self.cluster.nodelist() @@ -763,6 +771,7 @@ class TestIncRepair(Tester): """ Test repaired data remains in sync after a bootstrap """ self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false', 'commitlog_sync_period_in_ms': 500}) + self.init_default_config() self.cluster.populate(3).start() node1, node2, node3 = self.cluster.nodelist() @@ -804,6 +813,7 @@ class TestIncRepair(Tester): self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false', 'num_tokens': 1, 'commitlog_sync_period_in_ms': 500}) + self.init_default_config() self.cluster.populate(3).start() node1, node2, node3 = self.cluster.nodelist() @@ -837,6 +847,7 @@ class TestIncRepair(Tester): self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false', 'num_tokens': 1, 'commitlog_sync_period_in_ms': 500}) + self.init_default_config() self.cluster.populate(3).start() node1, node2, node3 = self.cluster.nodelist() @@ -865,6 +876,7 @@ class TestIncRepair(Tester): self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false', 'num_tokens': 1, 'commitlog_sync_period_in_ms': 500}) + self.init_default_config() self.cluster.populate(3).start() node1, node2, node3 = self.cluster.nodelist() @@ -893,6 +905,7 @@ class TestIncRepair(Tester): 'num_tokens': 1, 'commitlog_sync_period_in_ms': 500, 'partitioner': 'org.apache.cassandra.dht.Murmur3Partitioner'}) + self.init_default_config() self.cluster.populate(3).start() node1, node2, node3 = self.cluster.nodelist() @@ -918,3 +931,202 @@ class TestIncRepair(Tester): self.assertRepairedAndUnrepaired(node1, 'ks') self.assertRepairedAndUnrepaired(node2, 'ks') self.assertRepairedAndUnrepaired(node3, 'ks') + + @since('4.0') + def test_repaired_tracking_with_partition_deletes(self): + """ + check that when an tracking repaired data status following a digest mismatch, + repaired data mismatches are marked as unconfirmed as we may skip sstables + after the partition delete are encountered. + @jira_ticket CASSANDRA-14145 + """ + session, node1, node2 = self.setup_for_repaired_data_tracking() + stmt = SimpleStatement("INSERT INTO ks.tbl (k, c, v) VALUES (%s, %s, %s)") + stmt.consistency_level = ConsistencyLevel.ALL + for i in range(10): + session.execute(stmt, (i, i, i)) + + for node in self.cluster.nodelist(): + node.flush() + self.assertNoRepairedSSTables(node, 'ks') + + node1.repair(options=['ks']) + node2.stop(wait_other_notice=True) + + session.execute("delete from ks.tbl where k = 5") + + node1.flush() + node2.start(wait_other_notice=True) + + # expect unconfirmed inconsistencies as the partition deletes cause some sstables to be skipped + with JolokiaAgent(node1) as jmx: + self.query_and_check_repaired_mismatches(jmx, session, "SELECT * FROM ks.tbl WHERE k = 5", + expect_unconfirmed_inconsistencies=True) + self.query_and_check_repaired_mismatches(jmx, session, "SELECT * FROM ks.tbl WHERE k = 5 AND c = 5", + expect_unconfirmed_inconsistencies=True) + # no digest reads for range queries so blocking read repair metric isn't incremented + # *all* sstables are read for partition ranges too, and as the repaired set is still in sync there should + # be no inconsistencies + self.query_and_check_repaired_mismatches(jmx, session, "SELECT * FROM ks.tbl", expect_read_repair=False) + + @since('4.0') + def test_repaired_tracking_with_varying_sstable_sets(self): + """ + verify that repaired data digests are computed over the merged data for each replica + and that the particular number of sstables on each doesn't affect the comparisons + both replicas start with the same repaired set, comprising 2 sstables. node1's is + then compacted and additional unrepaired data added (which overwrites some in the + repaired set). We expect the repaired digests to still match as the tracking will + force all sstables containing the partitions to be read + there are two variants of this, for single partition slice & names reads and range reads + @jira_ticket CASSANDRA-14145 + """ + session, node1, node2 = self.setup_for_repaired_data_tracking() + stmt = SimpleStatement("INSERT INTO ks.tbl (k, c, v) VALUES (%s, %s, %s)") + stmt.consistency_level = ConsistencyLevel.ALL + for i in range(10): + session.execute(stmt, (i, i, i)) + + for node in self.cluster.nodelist(): + node.flush() + + for i in range(10,20): + session.execute(stmt, (i, i, i)) + + for node in self.cluster.nodelist(): + node.flush() + self.assertNoRepairedSSTables(node, 'ks') + + node1.repair(options=['ks']) + node2.stop(wait_other_notice=True) + + session.execute("insert into ks.tbl (k, c, v) values (5, 5, 55)") + session.execute("insert into ks.tbl (k, c, v) values (15, 15, 155)") + node1.flush() + node1.compact() + node1.compact() + node2.start(wait_other_notice=True) + + #Â we don't expect any inconsistencies as all repaired data is read on both replicas + with JolokiaAgent(node1) as jmx: + self.query_and_check_repaired_mismatches(jmx, session, "SELECT * FROM ks.tbl WHERE k = 5") + self.query_and_check_repaired_mismatches(jmx, session, "SELECT * FROM ks.tbl WHERE k = 5 AND c = 5") + # no digest reads for range queries so read repair metric isn't incremented + self.query_and_check_repaired_mismatches(jmx, session, "SELECT * FROM ks.tbl", expect_read_repair=False) + + @since('4.0') + def test_repaired_tracking_with_mismatching_replicas(self): + """ + verify that when replicas have different repaired sets, this can be detected via the digests + computed at read time. All nodes have start with the same data, but only 1 replica's sstables + are marked repaired. Then a divergence is introduced by overwriting on 1 replica only, which + is required to trigger a digest mismatch & full data read (for single partition reads). + As the repaired sets are different between the replicas, but no other shortcutting occurs + (no partition tombstones or sstable skipping) and no sstables are involved in pending repair + session, we expect confirmed inconsistencies to be reported. + there are two variants of this, for single partition slice & names reads and range reads + @jira_ticket CASSANDRA-14145 + """ + session, node1, node2 = self.setup_for_repaired_data_tracking() + stmt = SimpleStatement("INSERT INTO ks.tbl (k, c, v) VALUES (%s, %s, %s)") + stmt.consistency_level = ConsistencyLevel.ALL + for i in range(10): + session.execute(stmt, (i, i, i)) + + for node in self.cluster.nodelist(): + node.flush() + + for i in range(10,20): + session.execute(stmt, (i, i, i)) + + for node in self.cluster.nodelist(): + node.flush() + self.assertNoRepairedSSTables(node, 'ks') + + # stop node 2 and mark its sstables repaired + node2.stop(wait_other_notice=True) + node2.run_sstablerepairedset(keyspace='ks') + # before restarting node2 overwrite some data on node1 to trigger digest mismatches + session.execute("insert into ks.tbl (k, c, v) values (5, 5, 55)") + node2.start(wait_for_binary_proto=True) + + out1 = node1.run_sstablemetadata(keyspace='ks').stdout + out2 = node2.run_sstablemetadata(keyspace='ks').stdout + + # verify the repaired at times for the sstables on node1/node2 + assert all(t == 0 for t in [int(x) for x in [y.split(' ')[0] for y in findall('(?<=Repaired at: ).*', out1)]]) + assert all(t > 0 for t in [int(x) for x in [y.split(' ')[0] for y in findall('(?<=Repaired at: ).*', out2)]]) + + #Â we expect inconsistencies due to sstables being marked repaired on one replica only + # these are marked confirmed because no sessions are pending & all sstables are + # skipped due to partition deletes + with JolokiaAgent(node1) as jmx: + self.query_and_check_repaired_mismatches(jmx, session, "SELECT * FROM ks.tbl WHERE k = 5", + expect_confirmed_inconsistencies=True) + self.query_and_check_repaired_mismatches(jmx, session, "SELECT * FROM ks.tbl WHERE k = 5 AND c = 5", + expect_confirmed_inconsistencies=True) + # no digest reads for range queries so read repair metric isn't incremented + self.query_and_check_repaired_mismatches(jmx, session, "SELECT * FROM ks.tbl", expect_read_repair=False) + + def setup_for_repaired_data_tracking(self): + self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false', + 'num_tokens': 1, + 'commitlog_sync_period_in_ms': 500}) + self.init_default_config() + self.cluster.populate(2) + node1, node2 = self.cluster.nodelist() + remove_perf_disable_shared_mem(node1) # necessary for jmx + self.cluster.start() + + session = self.patient_exclusive_cql_connection(node1) + session.execute("CREATE KEYSPACE ks WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 2}") + session.execute("CREATE TABLE ks.tbl (k INT, c INT, v INT, PRIMARY KEY (k,c)) with read_repair='NONE'") + return session, node1, node2 + + def query_and_check_repaired_mismatches(self, jmx, session, query, + expect_read_repair=True, + expect_unconfirmed_inconsistencies=False, + expect_confirmed_inconsistencies=False): + + rr_count = make_mbean('metrics', type='ReadRepair', name='ReconcileRead') + unconfirmed_count = make_mbean('metrics', type='Table,keyspace=ks', name='RepairedDataInconsistenciesUnconfirmed,scope=tbl') + confirmed_count = make_mbean('metrics', type='Table,keyspace=ks', name='RepairedDataInconsistenciesConfirmed,scope=tbl') + + rr_before = self.get_attribute_count(jmx, rr_count) + uc_before = self.get_attribute_count(jmx, unconfirmed_count) + cc_before = self.get_attribute_count(jmx, confirmed_count) + + stmt = SimpleStatement(query) + stmt.consistency_level = ConsistencyLevel.ALL + session.execute(stmt) + + rr_after = self.get_attribute_count(jmx, rr_count) + uc_after = self.get_attribute_count(jmx, unconfirmed_count) + cc_after = self.get_attribute_count(jmx, confirmed_count) + + logger.debug("Read Repair Count: {before}, {after}".format(before=rr_before, after=rr_after)) + logger.debug("Unconfirmed Inconsistency Count: {before}, {after}".format(before=uc_before, after=uc_after)) + logger.debug("Confirmed Inconsistency Count: {before}, {after}".format(before=cc_before, after=cc_after)) + + if expect_read_repair: + assert rr_after > rr_before + else: + assert rr_after == rr_before + + if expect_unconfirmed_inconsistencies: + assert uc_after > uc_before + else: + assert uc_after == uc_before + + if expect_confirmed_inconsistencies: + assert cc_after > cc_before + else: + assert cc_after == cc_before + + def get_attribute_count(self, jmx, bean): + # the MBean may not have been initialized, in which case Jolokia agent will return + # a HTTP 404 response. If we receive such, we know that the count can only be 0 + if jmx.has_mbean(bean): + return jmx.read_attribute(bean, 'Count') + else: + return 0 --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
