CASSANDRA-10130 (#1486)

* Add test case for CASSANDRA-10130

* Address comments by @sbtourist

* Add more tests for index status management

* Ad missed `@staticmethod` annotation

* Add @since annotations for 4.0

* Update failing index build failures

* Fix code style removing trailing whitespaces and blank lines with whitespaces


Project: http://git-wip-us.apache.org/repos/asf/cassandra-dtest/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra-dtest/commit/50e1e7b1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra-dtest/tree/50e1e7b1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra-dtest/diff/50e1e7b1

Branch: refs/heads/master
Commit: 50e1e7b13a1eef3e9347aee7806dc40569ab17ad
Parents: 6847bc1
Author: Andrés de la Peña <adelap...@users.noreply.github.com>
Authored: Mon Jun 26 13:18:55 2017 +0100
Committer: Philip Thompson <ptnapol...@gmail.com>
Committed: Mon Jun 26 14:18:55 2017 +0200

----------------------------------------------------------------------
 byteman/index_build_failure.btm    |  13 +++
 secondary_indexes_test.py          | 174 +++++++++++++++++++++++++++++---
 sstable_generation_loading_test.py | 122 +++++++++++++++++-----
 3 files changed, 271 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/50e1e7b1/byteman/index_build_failure.btm
----------------------------------------------------------------------
diff --git a/byteman/index_build_failure.btm b/byteman/index_build_failure.btm
new file mode 100644
index 0000000..8f5183d
--- /dev/null
+++ b/byteman/index_build_failure.btm
@@ -0,0 +1,13 @@
+#
+# Sleep 5s during index update
+#
+RULE fail during index building
+CLASS org.apache.cassandra.db.compaction.CompactionManager
+METHOD submitIndexBuild
+AT ENTRY
+# set flag to only run this rule once.
+IF NOT flagged("done")
+DO
+   flag("done");
+   throw new java.lang.RuntimeException("Index building failure")
+ENDRULE

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/50e1e7b1/secondary_indexes_test.py
----------------------------------------------------------------------
diff --git a/secondary_indexes_test.py b/secondary_indexes_test.py
index b73e94d..1edd30e 100644
--- a/secondary_indexes_test.py
+++ b/secondary_indexes_test.py
@@ -13,7 +13,7 @@ from cassandra.query import BatchStatement, SimpleStatement
 
 from dtest import (DISABLE_VNODES, OFFHEAP_MEMTABLES, DtestTimeoutError,
                    Tester, debug, CASSANDRA_VERSION_FROM_BUILD, create_ks, 
create_cf)
-from tools.assertions import assert_bootstrap_state, assert_invalid, 
assert_one, assert_row_count
+from tools.assertions import assert_bootstrap_state, assert_invalid, 
assert_none, assert_one, assert_row_count
 from tools.data import index_is_built, rows_to_list
 from tools.decorators import since
 from tools.misc import new_node
@@ -21,6 +21,16 @@ from tools.misc import new_node
 
 class TestSecondaryIndexes(Tester):
 
+    @staticmethod
+    def _index_sstables_files(node, keyspace, table, index):
+        files = []
+        for data_dir in node.data_directories():
+            data_dir = os.path.join(data_dir, keyspace)
+            base_tbl_dir = os.path.join(data_dir, [s for s in 
os.listdir(data_dir) if s.startswith(table)][0])
+            index_sstables_dir = os.path.join(base_tbl_dir, '.' + index)
+            files.extend(os.listdir(index_sstables_dir))
+        return set(files)
+
     def data_created_before_index_not_returned_in_where_query_test(self):
         """
         @jira_ticket CASSANDRA-3367
@@ -307,14 +317,7 @@ class TestSecondaryIndexes(Tester):
 
         stmt = session.prepare('select * from standard1 where "C0" = ?')
         self.assertEqual(1, len(list(session.execute(stmt, [lookup_value]))))
-        before_files = []
-        index_sstables_dirs = []
-        for data_dir in node1.data_directories():
-            data_dir = os.path.join(data_dir, 'keyspace1')
-            base_tbl_dir = os.path.join(data_dir, [s for s in 
os.listdir(data_dir) if s.startswith("standard1")][0])
-            index_sstables_dir = os.path.join(base_tbl_dir, '.ix_c0')
-            before_files.extend(os.listdir(index_sstables_dir))
-            index_sstables_dirs.append(index_sstables_dir)
+        before_files = self._index_sstables_files(node1, 'keyspace1', 
'standard1', 'ix_c0')
 
         node1.nodetool("rebuild_index keyspace1 standard1 ix_c0")
         start = time.time()
@@ -326,15 +329,160 @@ class TestSecondaryIndexes(Tester):
         else:
             raise DtestTimeoutError()
 
-        after_files = []
-        for index_sstables_dir in index_sstables_dirs:
-            after_files.extend(os.listdir(index_sstables_dir))
-        self.assertNotEqual(set(before_files), set(after_files))
+        after_files = self._index_sstables_files(node1, 'keyspace1', 
'standard1', 'ix_c0')
+        self.assertNotEqual(before_files, after_files)
         self.assertEqual(1, len(list(session.execute(stmt, [lookup_value]))))
 
         # verify that only the expected row is present in the build indexes 
table
         self.assertEqual(1, len(list(session.execute("""SELECT * FROM 
system."IndexInfo";"""))))
 
+    @since('4.0')
+    def test_failing_manual_rebuild_index(self):
+        """
+        @jira_ticket CASSANDRA-10130
+
+        Tests the management of index status during manual index rebuilding 
failures.
+        """
+
+        cluster = self.cluster
+        cluster.populate(1, 
install_byteman=True).start(wait_for_binary_proto=True)
+        node = cluster.nodelist()[0]
+
+        session = self.patient_cql_connection(node)
+        create_ks(session, 'k', 1)
+        session.execute("CREATE TABLE k.t (k int PRIMARY KEY, v int)")
+        session.execute("CREATE INDEX idx ON k.t(v)")
+        session.execute("INSERT INTO k.t(k, v) VALUES (0, 1)")
+        session.execute("INSERT INTO k.t(k, v) VALUES (2, 3)")
+
+        # Verify that the index is marked as built and it can answer queries
+        assert_one(session, """SELECT * FROM system."IndexInfo" WHERE 
table_name='k'""", ['k', 'idx'])
+        assert_one(session, "SELECT * FROM k.t WHERE v = 1", [0, 1])
+
+        # Simulate a failing index rebuild
+        before_files = self._index_sstables_files(node, 'k', 't', 'idx')
+        node.byteman_submit(['./byteman/index_build_failure.btm'])
+        with self.assertRaises(Exception):
+            node.nodetool("rebuild_index k t idx")
+        after_files = self._index_sstables_files(node, 'k', 't', 'idx')
+
+        # Verify that the index is not rebuilt, not marked as built, and it 
still can answer queries
+        self.assertEqual(before_files, after_files)
+        assert_none(session, """SELECT * FROM system."IndexInfo" WHERE 
table_name='k'""")
+        assert_one(session, "SELECT * FROM k.t WHERE v = 1", [0, 1])
+
+        # Restart the node to trigger the scheduled index rebuild
+        before_files = after_files
+        node.nodetool('drain')
+        node.stop()
+        cluster.start()
+        session = self.patient_cql_connection(node)
+        session.execute("USE k")
+        after_files = self._index_sstables_files(node, 'k', 't', 'idx')
+
+        # Verify that, the index is rebuilt, marked as built, and it can 
answer queries
+        self.assertNotEqual(before_files, after_files)
+        assert_one(session, """SELECT * FROM system."IndexInfo" WHERE 
table_name='k'""", ['k', 'idx'])
+        assert_one(session, "SELECT * FROM k.t WHERE v = 1", [0, 1])
+
+        # Simulate another failing index rebuild
+        before_files = self._index_sstables_files(node, 'k', 't', 'idx')
+        node.byteman_submit(['./byteman/index_build_failure.btm'])
+        with self.assertRaises(Exception):
+            node.nodetool("rebuild_index k t idx")
+        after_files = self._index_sstables_files(node, 'k', 't', 'idx')
+
+        # Verify that the index is not rebuilt, not marked as built, and it 
still can answer queries
+        self.assertEqual(before_files, after_files)
+        assert_none(session, """SELECT * FROM system."IndexInfo" WHERE 
table_name='k'""")
+        assert_one(session, "SELECT * FROM k.t WHERE v = 1", [0, 1])
+
+        # Successfully rebuild the index
+        before_files = after_files
+        node.nodetool("rebuild_index k t idx")
+        cluster.wait_for_compactions()
+        after_files = self._index_sstables_files(node, 'k', 't', 'idx')
+
+        # Verify that the index is rebuilt, marked as built, and it can answer 
queries
+        self.assertNotEqual(before_files, after_files)
+        assert_one(session, """SELECT * FROM system."IndexInfo" WHERE 
table_name='k'""", ['k', 'idx'])
+        assert_one(session, "SELECT * FROM k.t WHERE v = 1", [0, 1])
+
+    @since('4.0')
+    def test_drop_index_while_building(self):
+        """
+        asserts that indexes deleted before they have been completely build 
are invalidated and not built after restart
+        """
+        cluster = self.cluster
+        cluster.populate(1).start()
+        node = cluster.nodelist()[0]
+        session = self.patient_cql_connection(node)
+
+        # Create some thousands of rows to guarantee a long index building
+        node.stress(['write', 'n=50K', 'no-warmup'])
+        session.execute("USE keyspace1")
+
+        # Create an index and immediately drop it, without waiting for index 
building
+        session.execute('CREATE INDEX idx ON standard1("C0")')
+        session.execute('DROP INDEX idx')
+        cluster.wait_for_compactions()
+
+        # Check that the index is not marked as built nor queryable
+        assert_none(session, """SELECT * FROM system."IndexInfo" WHERE 
table_name='keyspace1'""")
+        assert_invalid(session,
+                       'SELECT * FROM standard1 WHERE "C0" = 0x00',
+                       'Cannot execute this query as it might involve data 
filtering')
+
+        # Restart the node to trigger any eventual unexpected index rebuild
+        node.nodetool('drain')
+        node.stop()
+        cluster.start()
+        session = self.patient_cql_connection(node)
+        session.execute("USE keyspace1")
+
+        # The index should remain not built nor queryable after restart
+        assert_none(session, """SELECT * FROM system."IndexInfo" WHERE 
table_name='keyspace1'""")
+        assert_invalid(session,
+                       'SELECT * FROM standard1 WHERE "C0" = 0x00',
+                       'Cannot execute this query as it might involve data 
filtering')
+
+    @since('4.0')
+    def test_index_is_not_always_rebuilt_at_start(self):
+        """
+        @jira_ticket CASSANDRA-10130
+
+        Tests the management of index status during manual index rebuilding 
failures.
+        """
+
+        cluster = self.cluster
+        cluster.populate(1, 
install_byteman=True).start(wait_for_binary_proto=True)
+        node = cluster.nodelist()[0]
+
+        session = self.patient_cql_connection(node)
+        create_ks(session, 'k', 1)
+        session.execute("CREATE TABLE k.t (k int PRIMARY KEY, v int)")
+        session.execute("CREATE INDEX idx ON k.t(v)")
+        session.execute("INSERT INTO k.t(k, v) VALUES (0, 1)")
+        session.execute("INSERT INTO k.t(k, v) VALUES (2, 3)")
+
+        # Verify that the index is marked as built and it can answer queries
+        assert_one(session, """SELECT * FROM system."IndexInfo" WHERE 
table_name='k'""", ['k', 'idx'])
+        assert_one(session, "SELECT * FROM k.t WHERE v = 1", [0, 1])
+
+        # Restart the node to trigger any eventual undesired index rebuild
+        before_files = self._index_sstables_files(node, 'k', 't', 'idx')
+        node.nodetool('drain')
+        node.stop()
+        cluster.start()
+        session = self.patient_cql_connection(node)
+        session.execute("USE k")
+        after_files = self._index_sstables_files(node, 'k', 't', 'idx')
+
+        # Verify that, the index is not rebuilt, marked as built, and it can 
answer queries
+        self.assertNotEqual(before_files, after_files)
+        assert_one(session, """SELECT * FROM system."IndexInfo" WHERE 
table_name='k'""", ['k', 'idx'])
+        assert_one(session, "SELECT * FROM k.t WHERE v = 1", [0, 1])
+
     def test_multi_index_filtering_query(self):
         """
         asserts that having multiple indexes that cover all predicates still 
requires ALLOW FILTERING to also be present

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/50e1e7b1/sstable_generation_loading_test.py
----------------------------------------------------------------------
diff --git a/sstable_generation_loading_test.py 
b/sstable_generation_loading_test.py
index e04d714..37ea8e3 100644
--- a/sstable_generation_loading_test.py
+++ b/sstable_generation_loading_test.py
@@ -6,7 +6,8 @@ from distutils import dir_util
 from ccmlib import common as ccmcommon
 
 from dtest import Tester, debug, create_ks, create_cf
-from tools.assertions import assert_one
+from tools.assertions import assert_all, assert_none, assert_one
+from tools.decorators import since
 
 
 # WARNING: sstableloader tests should be added to 
TestSSTableGenerationAndLoading (below),
@@ -68,6 +69,34 @@ class BaseSStableLoaderTest(Tester):
 
         self.load_sstable_with_configuration(ks='"Keyspace1"', 
create_schema=create_schema_with_mv)
 
+    def copy_sstables(self, cluster, node):
+        for x in xrange(0, cluster.data_dir_count):
+            data_dir = os.path.join(node.get_path(), 'data{0}'.format(x))
+            copy_root = os.path.join(node.get_path(), 'data{0}_copy'.format(x))
+            for ddir in os.listdir(data_dir):
+                keyspace_dir = os.path.join(data_dir, ddir)
+                if os.path.isdir(keyspace_dir) and ddir != 'system':
+                    copy_dir = os.path.join(copy_root, ddir)
+                    dir_util.copy_tree(keyspace_dir, copy_dir)
+
+    def load_sstables(self, cluster, node, ks):
+        cdir = node.get_install_dir()
+        sstableloader = os.path.join(cdir, 'bin', 
ccmcommon.platform_binary('sstableloader'))
+        env = ccmcommon.make_cassandra_env(cdir, node.get_path())
+        host = node.address()
+        for x in xrange(0, cluster.data_dir_count):
+            sstablecopy_dir = os.path.join(node.get_path(), 
'data{0}_copy'.format(x), ks.strip('"'))
+            for cf_dir in os.listdir(sstablecopy_dir):
+                full_cf_dir = os.path.join(sstablecopy_dir, cf_dir)
+                if os.path.isdir(full_cf_dir):
+                    cmd_args = [sstableloader, '--nodes', host, full_cf_dir]
+                    p = subprocess.Popen(cmd_args, stderr=subprocess.PIPE, 
stdout=subprocess.PIPE, env=env)
+                    exit_status = p.wait()
+                    debug('stdout: {out}'.format(out=p.stdout))
+                    debug('stderr: {err}'.format(err=p.stderr))
+                    self.assertEqual(0, exit_status,
+                                     "sstableloader exited with a non-zero 
status: {}".format(exit_status))
+
     def load_sstable_with_configuration(self, pre_compression=None, 
post_compression=None, ks="ks", create_schema=create_schema):
         """
         tests that the sstableloader works by using it to load data.
@@ -112,14 +141,7 @@ class BaseSStableLoaderTest(Tester):
 
         debug("Making a copy of the sstables")
         # make a copy of the sstables
-        for x in xrange(0, cluster.data_dir_count):
-            data_dir = os.path.join(node1.get_path(), 'data{0}'.format(x))
-            copy_root = os.path.join(node1.get_path(), 
'data{0}_copy'.format(x))
-            for ddir in os.listdir(data_dir):
-                keyspace_dir = os.path.join(data_dir, ddir)
-                if os.path.isdir(keyspace_dir) and ddir != 'system':
-                    copy_dir = os.path.join(copy_root, ddir)
-                    dir_util.copy_tree(keyspace_dir, copy_dir)
+        self.copy_sstables(cluster, node1)
 
         debug("Wiping out the data and restarting cluster")
         # wipe out the node data.
@@ -140,22 +162,7 @@ class BaseSStableLoaderTest(Tester):
 
         debug("Calling sstableloader")
         # call sstableloader to re-load each cf.
-        cdir = node1.get_install_dir()
-        sstableloader = os.path.join(cdir, 'bin', 
ccmcommon.platform_binary('sstableloader'))
-        env = ccmcommon.make_cassandra_env(cdir, node1.get_path())
-        host = node1.address()
-        for x in xrange(0, cluster.data_dir_count):
-            sstablecopy_dir = os.path.join(node1.get_path(), 
'data{0}_copy'.format(x), ks.strip('"'))
-            for cf_dir in os.listdir(sstablecopy_dir):
-                full_cf_dir = os.path.join(sstablecopy_dir, cf_dir)
-                if os.path.isdir(full_cf_dir):
-                    cmd_args = [sstableloader, '--nodes', host, full_cf_dir]
-                    p = subprocess.Popen(cmd_args, stderr=subprocess.PIPE, 
stdout=subprocess.PIPE, env=env)
-                    exit_status = p.wait()
-                    debug('stdout: {out}'.format(out=p.stdout))
-                    debug('stderr: {err}'.format(err=p.stderr))
-                    self.assertEqual(0, exit_status,
-                                     "sstableloader exited with a non-zero 
status: {}".format(exit_status))
+        self.load_sstables(cluster, node1, ks)
 
         def read_and_validate_data(session):
             for i in range(NUM_KEYS):
@@ -287,3 +294,68 @@ class 
TestSSTableGenerationAndLoading(BaseSStableLoaderTest):
                             "PRIMARY KEY (v)")
 
         self.load_sstable_with_configuration(ks='"Keyspace1"', 
create_schema=create_schema_with_mv)
+
+    @since('4.0')
+    def sstableloader_with_failing_2i_test(self):
+        """
+        @jira_ticket CASSANDRA-10130
+
+        Simulates an index building failure during SSTables load.
+        The table data should be loaded and the index should be marked for 
rebuilding during the next node start.
+        """
+        def create_schema_with_2i(session):
+            create_ks(session, 'k', 1)
+            session.execute("CREATE TABLE k.t (p int, c int, v int, PRIMARY 
KEY(p, c))")
+            session.execute("CREATE INDEX idx ON k.t(v)")
+
+        cluster = self.cluster
+        cluster.populate(1, 
install_byteman=True).start(wait_for_binary_proto=True)
+        node = cluster.nodelist()[0]
+
+        session = self.patient_cql_connection(node)
+        create_schema_with_2i(session)
+        session.execute("INSERT INTO k.t(p, c, v) VALUES (0, 1, 8)")
+
+        # Stop node and copy SSTables
+        node.nodetool('drain')
+        node.stop()
+        self.copy_sstables(cluster, node)
+
+        # Wipe out data and restart
+        cluster.clear()
+        cluster.start()
+
+        # Restore the schema
+        session = self.patient_cql_connection(node)
+        create_schema_with_2i(session)
+
+        # The table should exist and be empty, and the index should be empty 
and marked as built
+        assert_one(session, """SELECT * FROM system."IndexInfo" WHERE 
table_name='k'""", ['k', 'idx'])
+        assert_none(session, "SELECT * FROM k.t")
+        assert_none(session, "SELECT * FROM k.t WHERE v = 8")
+
+        # Add some additional data before loading the SSTable, to check that 
it will be still accessible
+        session.execute("INSERT INTO k.t(p, c, v) VALUES (0, 2, 8)")
+        assert_one(session, "SELECT * FROM k.t", [0, 2, 8])
+        assert_one(session, "SELECT * FROM k.t WHERE v = 8", [0, 2, 8])
+
+        # Load SSTables with a failure during index creation
+        node.byteman_submit(['./byteman/index_build_failure.btm'])
+        with self.assertRaises(Exception):
+            self.load_sstables(cluster, node, 'k')
+
+        # Check that the index isn't marked as built and the old SSTable data 
has been loaded but not indexed
+        assert_none(session, """SELECT * FROM system."IndexInfo" WHERE 
table_name='k'""")
+        assert_all(session, "SELECT * FROM k.t", [[0, 1, 8], [0, 2, 8]])
+        assert_one(session, "SELECT * FROM k.t WHERE v = 8", [0, 2, 8])
+
+        # Restart the node to trigger index rebuild
+        node.nodetool('drain')
+        node.stop()
+        cluster.start()
+        session = self.patient_cql_connection(node)
+
+        # Check that the index is marked as built and the index has been 
rebuilt
+        assert_one(session, """SELECT * FROM system."IndexInfo" WHERE 
table_name='k'""", ['k', 'idx'])
+        assert_all(session, "SELECT * FROM k.t", [[0, 1, 8], [0, 2, 8]])
+        assert_all(session, "SELECT * FROM k.t WHERE v = 8", [[0, 1, 8], [0, 
2, 8]])


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to