CASSANDRA-10130 (#1486) * Add test case for CASSANDRA-10130
* Address comments by @sbtourist * Add more tests for index status management * Ad missed `@staticmethod` annotation * Add @since annotations for 4.0 * Update failing index build failures * Fix code style removing trailing whitespaces and blank lines with whitespaces Project: http://git-wip-us.apache.org/repos/asf/cassandra-dtest/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra-dtest/commit/50e1e7b1 Tree: http://git-wip-us.apache.org/repos/asf/cassandra-dtest/tree/50e1e7b1 Diff: http://git-wip-us.apache.org/repos/asf/cassandra-dtest/diff/50e1e7b1 Branch: refs/heads/master Commit: 50e1e7b13a1eef3e9347aee7806dc40569ab17ad Parents: 6847bc1 Author: Andrés de la Peña <adelap...@users.noreply.github.com> Authored: Mon Jun 26 13:18:55 2017 +0100 Committer: Philip Thompson <ptnapol...@gmail.com> Committed: Mon Jun 26 14:18:55 2017 +0200 ---------------------------------------------------------------------- byteman/index_build_failure.btm | 13 +++ secondary_indexes_test.py | 174 +++++++++++++++++++++++++++++--- sstable_generation_loading_test.py | 122 +++++++++++++++++----- 3 files changed, 271 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/50e1e7b1/byteman/index_build_failure.btm ---------------------------------------------------------------------- diff --git a/byteman/index_build_failure.btm b/byteman/index_build_failure.btm new file mode 100644 index 0000000..8f5183d --- /dev/null +++ b/byteman/index_build_failure.btm @@ -0,0 +1,13 @@ +# +# Sleep 5s during index update +# +RULE fail during index building +CLASS org.apache.cassandra.db.compaction.CompactionManager +METHOD submitIndexBuild +AT ENTRY +# set flag to only run this rule once. +IF NOT flagged("done") +DO + flag("done"); + throw new java.lang.RuntimeException("Index building failure") +ENDRULE http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/50e1e7b1/secondary_indexes_test.py ---------------------------------------------------------------------- diff --git a/secondary_indexes_test.py b/secondary_indexes_test.py index b73e94d..1edd30e 100644 --- a/secondary_indexes_test.py +++ b/secondary_indexes_test.py @@ -13,7 +13,7 @@ from cassandra.query import BatchStatement, SimpleStatement from dtest import (DISABLE_VNODES, OFFHEAP_MEMTABLES, DtestTimeoutError, Tester, debug, CASSANDRA_VERSION_FROM_BUILD, create_ks, create_cf) -from tools.assertions import assert_bootstrap_state, assert_invalid, assert_one, assert_row_count +from tools.assertions import assert_bootstrap_state, assert_invalid, assert_none, assert_one, assert_row_count from tools.data import index_is_built, rows_to_list from tools.decorators import since from tools.misc import new_node @@ -21,6 +21,16 @@ from tools.misc import new_node class TestSecondaryIndexes(Tester): + @staticmethod + def _index_sstables_files(node, keyspace, table, index): + files = [] + for data_dir in node.data_directories(): + data_dir = os.path.join(data_dir, keyspace) + base_tbl_dir = os.path.join(data_dir, [s for s in os.listdir(data_dir) if s.startswith(table)][0]) + index_sstables_dir = os.path.join(base_tbl_dir, '.' + index) + files.extend(os.listdir(index_sstables_dir)) + return set(files) + def data_created_before_index_not_returned_in_where_query_test(self): """ @jira_ticket CASSANDRA-3367 @@ -307,14 +317,7 @@ class TestSecondaryIndexes(Tester): stmt = session.prepare('select * from standard1 where "C0" = ?') self.assertEqual(1, len(list(session.execute(stmt, [lookup_value])))) - before_files = [] - index_sstables_dirs = [] - for data_dir in node1.data_directories(): - data_dir = os.path.join(data_dir, 'keyspace1') - base_tbl_dir = os.path.join(data_dir, [s for s in os.listdir(data_dir) if s.startswith("standard1")][0]) - index_sstables_dir = os.path.join(base_tbl_dir, '.ix_c0') - before_files.extend(os.listdir(index_sstables_dir)) - index_sstables_dirs.append(index_sstables_dir) + before_files = self._index_sstables_files(node1, 'keyspace1', 'standard1', 'ix_c0') node1.nodetool("rebuild_index keyspace1 standard1 ix_c0") start = time.time() @@ -326,15 +329,160 @@ class TestSecondaryIndexes(Tester): else: raise DtestTimeoutError() - after_files = [] - for index_sstables_dir in index_sstables_dirs: - after_files.extend(os.listdir(index_sstables_dir)) - self.assertNotEqual(set(before_files), set(after_files)) + after_files = self._index_sstables_files(node1, 'keyspace1', 'standard1', 'ix_c0') + self.assertNotEqual(before_files, after_files) self.assertEqual(1, len(list(session.execute(stmt, [lookup_value])))) # verify that only the expected row is present in the build indexes table self.assertEqual(1, len(list(session.execute("""SELECT * FROM system."IndexInfo";""")))) + @since('4.0') + def test_failing_manual_rebuild_index(self): + """ + @jira_ticket CASSANDRA-10130 + + Tests the management of index status during manual index rebuilding failures. + """ + + cluster = self.cluster + cluster.populate(1, install_byteman=True).start(wait_for_binary_proto=True) + node = cluster.nodelist()[0] + + session = self.patient_cql_connection(node) + create_ks(session, 'k', 1) + session.execute("CREATE TABLE k.t (k int PRIMARY KEY, v int)") + session.execute("CREATE INDEX idx ON k.t(v)") + session.execute("INSERT INTO k.t(k, v) VALUES (0, 1)") + session.execute("INSERT INTO k.t(k, v) VALUES (2, 3)") + + # Verify that the index is marked as built and it can answer queries + assert_one(session, """SELECT * FROM system."IndexInfo" WHERE table_name='k'""", ['k', 'idx']) + assert_one(session, "SELECT * FROM k.t WHERE v = 1", [0, 1]) + + # Simulate a failing index rebuild + before_files = self._index_sstables_files(node, 'k', 't', 'idx') + node.byteman_submit(['./byteman/index_build_failure.btm']) + with self.assertRaises(Exception): + node.nodetool("rebuild_index k t idx") + after_files = self._index_sstables_files(node, 'k', 't', 'idx') + + # Verify that the index is not rebuilt, not marked as built, and it still can answer queries + self.assertEqual(before_files, after_files) + assert_none(session, """SELECT * FROM system."IndexInfo" WHERE table_name='k'""") + assert_one(session, "SELECT * FROM k.t WHERE v = 1", [0, 1]) + + # Restart the node to trigger the scheduled index rebuild + before_files = after_files + node.nodetool('drain') + node.stop() + cluster.start() + session = self.patient_cql_connection(node) + session.execute("USE k") + after_files = self._index_sstables_files(node, 'k', 't', 'idx') + + # Verify that, the index is rebuilt, marked as built, and it can answer queries + self.assertNotEqual(before_files, after_files) + assert_one(session, """SELECT * FROM system."IndexInfo" WHERE table_name='k'""", ['k', 'idx']) + assert_one(session, "SELECT * FROM k.t WHERE v = 1", [0, 1]) + + # Simulate another failing index rebuild + before_files = self._index_sstables_files(node, 'k', 't', 'idx') + node.byteman_submit(['./byteman/index_build_failure.btm']) + with self.assertRaises(Exception): + node.nodetool("rebuild_index k t idx") + after_files = self._index_sstables_files(node, 'k', 't', 'idx') + + # Verify that the index is not rebuilt, not marked as built, and it still can answer queries + self.assertEqual(before_files, after_files) + assert_none(session, """SELECT * FROM system."IndexInfo" WHERE table_name='k'""") + assert_one(session, "SELECT * FROM k.t WHERE v = 1", [0, 1]) + + # Successfully rebuild the index + before_files = after_files + node.nodetool("rebuild_index k t idx") + cluster.wait_for_compactions() + after_files = self._index_sstables_files(node, 'k', 't', 'idx') + + # Verify that the index is rebuilt, marked as built, and it can answer queries + self.assertNotEqual(before_files, after_files) + assert_one(session, """SELECT * FROM system."IndexInfo" WHERE table_name='k'""", ['k', 'idx']) + assert_one(session, "SELECT * FROM k.t WHERE v = 1", [0, 1]) + + @since('4.0') + def test_drop_index_while_building(self): + """ + asserts that indexes deleted before they have been completely build are invalidated and not built after restart + """ + cluster = self.cluster + cluster.populate(1).start() + node = cluster.nodelist()[0] + session = self.patient_cql_connection(node) + + # Create some thousands of rows to guarantee a long index building + node.stress(['write', 'n=50K', 'no-warmup']) + session.execute("USE keyspace1") + + # Create an index and immediately drop it, without waiting for index building + session.execute('CREATE INDEX idx ON standard1("C0")') + session.execute('DROP INDEX idx') + cluster.wait_for_compactions() + + # Check that the index is not marked as built nor queryable + assert_none(session, """SELECT * FROM system."IndexInfo" WHERE table_name='keyspace1'""") + assert_invalid(session, + 'SELECT * FROM standard1 WHERE "C0" = 0x00', + 'Cannot execute this query as it might involve data filtering') + + # Restart the node to trigger any eventual unexpected index rebuild + node.nodetool('drain') + node.stop() + cluster.start() + session = self.patient_cql_connection(node) + session.execute("USE keyspace1") + + # The index should remain not built nor queryable after restart + assert_none(session, """SELECT * FROM system."IndexInfo" WHERE table_name='keyspace1'""") + assert_invalid(session, + 'SELECT * FROM standard1 WHERE "C0" = 0x00', + 'Cannot execute this query as it might involve data filtering') + + @since('4.0') + def test_index_is_not_always_rebuilt_at_start(self): + """ + @jira_ticket CASSANDRA-10130 + + Tests the management of index status during manual index rebuilding failures. + """ + + cluster = self.cluster + cluster.populate(1, install_byteman=True).start(wait_for_binary_proto=True) + node = cluster.nodelist()[0] + + session = self.patient_cql_connection(node) + create_ks(session, 'k', 1) + session.execute("CREATE TABLE k.t (k int PRIMARY KEY, v int)") + session.execute("CREATE INDEX idx ON k.t(v)") + session.execute("INSERT INTO k.t(k, v) VALUES (0, 1)") + session.execute("INSERT INTO k.t(k, v) VALUES (2, 3)") + + # Verify that the index is marked as built and it can answer queries + assert_one(session, """SELECT * FROM system."IndexInfo" WHERE table_name='k'""", ['k', 'idx']) + assert_one(session, "SELECT * FROM k.t WHERE v = 1", [0, 1]) + + # Restart the node to trigger any eventual undesired index rebuild + before_files = self._index_sstables_files(node, 'k', 't', 'idx') + node.nodetool('drain') + node.stop() + cluster.start() + session = self.patient_cql_connection(node) + session.execute("USE k") + after_files = self._index_sstables_files(node, 'k', 't', 'idx') + + # Verify that, the index is not rebuilt, marked as built, and it can answer queries + self.assertNotEqual(before_files, after_files) + assert_one(session, """SELECT * FROM system."IndexInfo" WHERE table_name='k'""", ['k', 'idx']) + assert_one(session, "SELECT * FROM k.t WHERE v = 1", [0, 1]) + def test_multi_index_filtering_query(self): """ asserts that having multiple indexes that cover all predicates still requires ALLOW FILTERING to also be present http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/50e1e7b1/sstable_generation_loading_test.py ---------------------------------------------------------------------- diff --git a/sstable_generation_loading_test.py b/sstable_generation_loading_test.py index e04d714..37ea8e3 100644 --- a/sstable_generation_loading_test.py +++ b/sstable_generation_loading_test.py @@ -6,7 +6,8 @@ from distutils import dir_util from ccmlib import common as ccmcommon from dtest import Tester, debug, create_ks, create_cf -from tools.assertions import assert_one +from tools.assertions import assert_all, assert_none, assert_one +from tools.decorators import since # WARNING: sstableloader tests should be added to TestSSTableGenerationAndLoading (below), @@ -68,6 +69,34 @@ class BaseSStableLoaderTest(Tester): self.load_sstable_with_configuration(ks='"Keyspace1"', create_schema=create_schema_with_mv) + def copy_sstables(self, cluster, node): + for x in xrange(0, cluster.data_dir_count): + data_dir = os.path.join(node.get_path(), 'data{0}'.format(x)) + copy_root = os.path.join(node.get_path(), 'data{0}_copy'.format(x)) + for ddir in os.listdir(data_dir): + keyspace_dir = os.path.join(data_dir, ddir) + if os.path.isdir(keyspace_dir) and ddir != 'system': + copy_dir = os.path.join(copy_root, ddir) + dir_util.copy_tree(keyspace_dir, copy_dir) + + def load_sstables(self, cluster, node, ks): + cdir = node.get_install_dir() + sstableloader = os.path.join(cdir, 'bin', ccmcommon.platform_binary('sstableloader')) + env = ccmcommon.make_cassandra_env(cdir, node.get_path()) + host = node.address() + for x in xrange(0, cluster.data_dir_count): + sstablecopy_dir = os.path.join(node.get_path(), 'data{0}_copy'.format(x), ks.strip('"')) + for cf_dir in os.listdir(sstablecopy_dir): + full_cf_dir = os.path.join(sstablecopy_dir, cf_dir) + if os.path.isdir(full_cf_dir): + cmd_args = [sstableloader, '--nodes', host, full_cf_dir] + p = subprocess.Popen(cmd_args, stderr=subprocess.PIPE, stdout=subprocess.PIPE, env=env) + exit_status = p.wait() + debug('stdout: {out}'.format(out=p.stdout)) + debug('stderr: {err}'.format(err=p.stderr)) + self.assertEqual(0, exit_status, + "sstableloader exited with a non-zero status: {}".format(exit_status)) + def load_sstable_with_configuration(self, pre_compression=None, post_compression=None, ks="ks", create_schema=create_schema): """ tests that the sstableloader works by using it to load data. @@ -112,14 +141,7 @@ class BaseSStableLoaderTest(Tester): debug("Making a copy of the sstables") # make a copy of the sstables - for x in xrange(0, cluster.data_dir_count): - data_dir = os.path.join(node1.get_path(), 'data{0}'.format(x)) - copy_root = os.path.join(node1.get_path(), 'data{0}_copy'.format(x)) - for ddir in os.listdir(data_dir): - keyspace_dir = os.path.join(data_dir, ddir) - if os.path.isdir(keyspace_dir) and ddir != 'system': - copy_dir = os.path.join(copy_root, ddir) - dir_util.copy_tree(keyspace_dir, copy_dir) + self.copy_sstables(cluster, node1) debug("Wiping out the data and restarting cluster") # wipe out the node data. @@ -140,22 +162,7 @@ class BaseSStableLoaderTest(Tester): debug("Calling sstableloader") # call sstableloader to re-load each cf. - cdir = node1.get_install_dir() - sstableloader = os.path.join(cdir, 'bin', ccmcommon.platform_binary('sstableloader')) - env = ccmcommon.make_cassandra_env(cdir, node1.get_path()) - host = node1.address() - for x in xrange(0, cluster.data_dir_count): - sstablecopy_dir = os.path.join(node1.get_path(), 'data{0}_copy'.format(x), ks.strip('"')) - for cf_dir in os.listdir(sstablecopy_dir): - full_cf_dir = os.path.join(sstablecopy_dir, cf_dir) - if os.path.isdir(full_cf_dir): - cmd_args = [sstableloader, '--nodes', host, full_cf_dir] - p = subprocess.Popen(cmd_args, stderr=subprocess.PIPE, stdout=subprocess.PIPE, env=env) - exit_status = p.wait() - debug('stdout: {out}'.format(out=p.stdout)) - debug('stderr: {err}'.format(err=p.stderr)) - self.assertEqual(0, exit_status, - "sstableloader exited with a non-zero status: {}".format(exit_status)) + self.load_sstables(cluster, node1, ks) def read_and_validate_data(session): for i in range(NUM_KEYS): @@ -287,3 +294,68 @@ class TestSSTableGenerationAndLoading(BaseSStableLoaderTest): "PRIMARY KEY (v)") self.load_sstable_with_configuration(ks='"Keyspace1"', create_schema=create_schema_with_mv) + + @since('4.0') + def sstableloader_with_failing_2i_test(self): + """ + @jira_ticket CASSANDRA-10130 + + Simulates an index building failure during SSTables load. + The table data should be loaded and the index should be marked for rebuilding during the next node start. + """ + def create_schema_with_2i(session): + create_ks(session, 'k', 1) + session.execute("CREATE TABLE k.t (p int, c int, v int, PRIMARY KEY(p, c))") + session.execute("CREATE INDEX idx ON k.t(v)") + + cluster = self.cluster + cluster.populate(1, install_byteman=True).start(wait_for_binary_proto=True) + node = cluster.nodelist()[0] + + session = self.patient_cql_connection(node) + create_schema_with_2i(session) + session.execute("INSERT INTO k.t(p, c, v) VALUES (0, 1, 8)") + + # Stop node and copy SSTables + node.nodetool('drain') + node.stop() + self.copy_sstables(cluster, node) + + # Wipe out data and restart + cluster.clear() + cluster.start() + + # Restore the schema + session = self.patient_cql_connection(node) + create_schema_with_2i(session) + + # The table should exist and be empty, and the index should be empty and marked as built + assert_one(session, """SELECT * FROM system."IndexInfo" WHERE table_name='k'""", ['k', 'idx']) + assert_none(session, "SELECT * FROM k.t") + assert_none(session, "SELECT * FROM k.t WHERE v = 8") + + # Add some additional data before loading the SSTable, to check that it will be still accessible + session.execute("INSERT INTO k.t(p, c, v) VALUES (0, 2, 8)") + assert_one(session, "SELECT * FROM k.t", [0, 2, 8]) + assert_one(session, "SELECT * FROM k.t WHERE v = 8", [0, 2, 8]) + + # Load SSTables with a failure during index creation + node.byteman_submit(['./byteman/index_build_failure.btm']) + with self.assertRaises(Exception): + self.load_sstables(cluster, node, 'k') + + # Check that the index isn't marked as built and the old SSTable data has been loaded but not indexed + assert_none(session, """SELECT * FROM system."IndexInfo" WHERE table_name='k'""") + assert_all(session, "SELECT * FROM k.t", [[0, 1, 8], [0, 2, 8]]) + assert_one(session, "SELECT * FROM k.t WHERE v = 8", [0, 2, 8]) + + # Restart the node to trigger index rebuild + node.nodetool('drain') + node.stop() + cluster.start() + session = self.patient_cql_connection(node) + + # Check that the index is marked as built and the index has been rebuilt + assert_one(session, """SELECT * FROM system."IndexInfo" WHERE table_name='k'""", ['k', 'idx']) + assert_all(session, "SELECT * FROM k.t", [[0, 1, 8], [0, 2, 8]]) + assert_all(session, "SELECT * FROM k.t WHERE v = 8", [[0, 1, 8], [0, 2, 8]]) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org