http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/commitlog_test.py ---------------------------------------------------------------------- diff --git a/commitlog_test.py b/commitlog_test.py index 99c4caf..c8830cd 100644 --- a/commitlog_test.py +++ b/commitlog_test.py @@ -5,6 +5,8 @@ import stat import struct import time from distutils.version import LooseVersion +import pytest +import logging from cassandra import WriteTimeout from cassandra.cluster import NoHostAvailable, OperationTimedOut @@ -12,28 +14,33 @@ from ccmlib.common import is_win from ccmlib.node import Node, TimeoutError from parse import parse -from dtest import Tester, debug, create_ks -from tools.assertions import assert_almost_equal, assert_none, assert_one +from dtest import Tester, create_ks +from tools.assertions import (assert_almost_equal, assert_none, assert_one, assert_lists_equal_ignoring_order) from tools.data import rows_to_list -from tools.decorators import since + +since = pytest.mark.since +logger = logging.getLogger(__name__) class TestCommitLog(Tester): """ CommitLog Tests """ - allow_log_errors = True + @pytest.fixture(autouse=True) + def fixture_add_additional_log_patterns(self, fixture_dtest_setup): + fixture_dtest_setup.allow_log_errors = True + + @pytest.fixture(scope='function', autouse=True) + def fixture_set_cluster_settings(self, fixture_dtest_setup): + fixture_dtest_setup.cluster.populate(1) + [self.node1] = fixture_dtest_setup.cluster.nodelist() - def setUp(self): - super(TestCommitLog, self).setUp() - self.cluster.populate(1) - [self.node1] = self.cluster.nodelist() + yield - def tearDown(self): # Some of the tests change commitlog permissions to provoke failure # so this changes them back so we can delete them. self._change_commitlog_perms(stat.S_IWRITE | stat.S_IREAD | stat.S_IEXEC) - super(TestCommitLog, self).tearDown() + def prepare(self, configuration=None, create_test_keyspace=True, **kwargs): if configuration is None: @@ -41,7 +48,7 @@ class TestCommitLog(Tester): default_conf = {'commitlog_sync_period_in_ms': 1000} set_conf = dict(default_conf, **configuration) - debug('setting commitlog configuration with the following values: ' + logger.debug('setting commitlog configuration with the following values: ' '{set_conf} and the following kwargs: {kwargs}'.format( set_conf=set_conf, kwargs=kwargs)) self.cluster.set_configuration_options(values=set_conf, **kwargs) @@ -61,15 +68,15 @@ class TestCommitLog(Tester): def _change_commitlog_perms(self, mod): for path in self._get_commitlog_paths(): - debug('changing permissions to {perms} on {path}'.format(perms=oct(mod), path=path)) + logger.debug('changing permissions to {perms} on {path}'.format(perms=oct(mod), path=path)) os.chmod(path, mod) commitlogs = glob.glob(path + '/*') if commitlogs: - debug('changing permissions to {perms} on the following files:' + logger.debug('changing permissions to {perms} on the following files:' '\n {files}'.format(perms=oct(mod), files='\n '.join(commitlogs))) else: - debug(self._change_commitlog_perms.__name__ + ' called on empty commitlog directory ' + logger.debug(self._change_commitlog_perms.__name__ + ' called on empty commitlog directory ' '{path} with permissions {perms}'.format(path=path, perms=oct(mod))) for commitlog in commitlogs: @@ -108,7 +115,7 @@ class TestCommitLog(Tester): time.sleep(1) commitlogs = self._get_commitlog_files() - self.assertGreater(len(commitlogs), 0, 'No commit log files were created') + assert len(commitlogs) > 0, 'No commit log files were created' # the most recently-written segment of the commitlog may be smaller # than the expected size, so we allow exactly one segment to be smaller @@ -116,23 +123,23 @@ class TestCommitLog(Tester): for i, f in enumerate(commitlogs): size = os.path.getsize(f) size_in_mb = int(size / 1024 / 1024) - debug('segment file {} {}; smaller already found: {}'.format(f, size_in_mb, smaller_found)) + logger.debug('segment file {} {}; smaller already found: {}'.format(f, size_in_mb, smaller_found)) if size_in_mb < 1 or size < (segment_size * 0.1): - debug('segment file not yet used; moving to next file') + logger.debug('segment file not yet used; moving to next file') continue # commitlog not yet used try: if compressed: # if compression is used, we assume there will be at most a 50% compression ratio - self.assertLess(size, segment_size) - self.assertGreater(size, segment_size / 2) + assert size < segment_size + assert size > segment_size / 2 else: # if no compression is used, the size will be close to what we expect assert_almost_equal(size, segment_size, error=0.05) except AssertionError as e: # the last segment may be smaller if not smaller_found: - self.assertLessEqual(size, segment_size) + assert size <= segment_size smaller_found = True else: raise e @@ -141,7 +148,7 @@ class TestCommitLog(Tester): """ Provoke the commitlog failure """ - debug('Provoking commitlog failure') + logger.debug('Provoking commitlog failure') # Test things are ok at this point self.session1.execute(""" INSERT INTO test (key, col1) VALUES (1, 1); @@ -164,17 +171,16 @@ class TestCommitLog(Tester): replay due to MV lock contention. Fixed in 3.0.7 and 3.7. @jira_ticket CASSANDRA-11891 """ - cluster_ver = self.cluster.version() if LooseVersion('3.1') <= cluster_ver < LooseVersion('3.7'): - self.skipTest("Fixed in 3.0.7 and 3.7") + pytest.skip("Fixed in 3.0.7 and 3.7") node1 = self.node1 node1.set_batch_commitlog(enabled=True) node1.start() session = self.patient_cql_connection(node1) - debug("Creating schema") + logger.debug("Creating schema") create_ks(session, 'Test', 1) session.execute(""" CREATE TABLE mytable ( @@ -192,37 +198,37 @@ class TestCommitLog(Tester): PRIMARY KEY (a, b); """) - debug("Insert data") + logger.debug("Insert data") num_rows = 1024 # maximum number of mutations replayed at once by the commit log - for i in xrange(num_rows): + for i in range(num_rows): session.execute("INSERT INTO Test.mytable (a, b, c) VALUES (0, {i}, {i})".format(i=i)) node1.stop(gently=False) node1.mark_log_for_errors() - debug("Verify commitlog was written before abrupt stop") + logger.debug("Verify commitlog was written before abrupt stop") commitlog_files = os.listdir(os.path.join(node1.get_path(), 'commitlogs')) - self.assertNotEqual([], commitlog_files) + assert [] != commitlog_files # set a short timeout to ensure lock contention will generally exceed this node1.set_configuration_options({'write_request_timeout_in_ms': 30}) - debug("Starting node again") + logger.debug("Starting node again") node1.start() - debug("Verify commit log was replayed on startup") + logger.debug("Verify commit log was replayed on startup") start_time, replay_complete = time.time(), False while not replay_complete: matches = node1.grep_log(r".*WriteTimeoutException.*") - self.assertEqual([], matches) + assert [] == matches replay_complete = node1.grep_log("Log replay complete") - self.assertLess(time.time() - start_time, 120, "Did not finish commitlog replay within 120 seconds") + assert time.time() - start_time < 120, "Did not finish commitlog replay within 120 seconds" - debug("Reconnecting to node") + logger.debug("Reconnecting to node") session = self.patient_cql_connection(node1) - debug("Make query to ensure data is present") + logger.debug("Make query to ensure data is present") res = list(session.execute("SELECT * FROM Test.mytable")) - self.assertEqual(num_rows, len(res), res) + assert num_rows == len(res), res def test_commitlog_replay_on_startup(self): """ @@ -232,7 +238,7 @@ class TestCommitLog(Tester): node1.set_batch_commitlog(enabled=True) node1.start() - debug("Insert data") + logger.debug("Insert data") session = self.patient_cql_connection(node1) create_ks(session, 'Test', 1) session.execute(""" @@ -247,69 +253,67 @@ class TestCommitLog(Tester): session.execute("INSERT INTO Test. users (user_name, password, gender, state, birth_year) " "VALUES('gandalf', 'p@$$', 'male', 'WA', 1955);") - debug("Verify data is present") + logger.debug("Verify data is present") session = self.patient_cql_connection(node1) res = session.execute("SELECT * FROM Test. users") - self.assertItemsEqual(rows_to_list(res), - [[u'gandalf', 1955, u'male', u'p@$$', u'WA']]) + assert rows_to_list(res) == [['gandalf', 1955, 'male', 'p@$$', 'WA']] - debug("Stop node abruptly") + logger.debug("Stop node abruptly") node1.stop(gently=False) - debug("Verify commitlog was written before abrupt stop") + logger.debug("Verify commitlog was written before abrupt stop") commitlog_dir = os.path.join(node1.get_path(), 'commitlogs') commitlog_files = os.listdir(commitlog_dir) - self.assertTrue(len(commitlog_files) > 0) + assert len(commitlog_files) > 0 - debug("Verify no SSTables were flushed before abrupt stop") - self.assertEqual(0, len(node1.get_sstables('test', 'users'))) + logger.debug("Verify no SSTables were flushed before abrupt stop") + assert 0 == len(node1.get_sstables('test', 'users')) - debug("Verify commit log was replayed on startup") + logger.debug("Verify commit log was replayed on startup") node1.start() node1.watch_log_for("Log replay complete") # Here we verify from the logs that some mutations were replayed replays = [match_tuple[0] for match_tuple in node1.grep_log(" \d+ replayed mutations")] - debug('The following log lines indicate that mutations were replayed: {msgs}'.format(msgs=replays)) + logger.debug('The following log lines indicate that mutations were replayed: {msgs}'.format(msgs=replays)) num_replayed_mutations = [ parse('{} {num_mutations:d} replayed mutations{}', line).named['num_mutations'] for line in replays ] # assert there were some lines where more than zero mutations were replayed - self.assertNotEqual([m for m in num_replayed_mutations if m > 0], []) + assert [m for m in num_replayed_mutations if m > 0] != [] - debug("Make query and ensure data is present") + logger.debug("Make query and ensure data is present") session = self.patient_cql_connection(node1) res = session.execute("SELECT * FROM Test. users") - self.assertItemsEqual(rows_to_list(res), - [[u'gandalf', 1955, u'male', u'p@$$', u'WA']]) + assert_lists_equal_ignoring_order(rows_to_list(res), [['gandalf', 1955, 'male', 'p@$$', 'WA']]) - def default_segment_size_test(self): + def test_default_segment_size(self): """ Test default commitlog_segment_size_in_mb (32MB) """ self._segment_size_test(32) - def small_segment_size_test(self): + def test_small_segment_size(self): """ Test a small commitlog_segment_size_in_mb (5MB) """ self._segment_size_test(5) @since('2.2') - def default_compressed_segment_size_test(self): + def test_default_compressed_segment_size(self): """ Test default compressed commitlog_segment_size_in_mb (32MB) """ self._segment_size_test(32, compressed=True) @since('2.2') - def small_compressed_segment_size_test(self): + def test_small_compressed_segment_size(self): """ Test a small compressed commitlog_segment_size_in_mb (5MB) """ self._segment_size_test(5, compressed=True) - def stop_failure_policy_test(self): + def test_stop_failure_policy(self): """ Test the stop commitlog failure policy (default one) """ @@ -317,23 +321,23 @@ class TestCommitLog(Tester): self._provoke_commitlog_failure() failure = self.node1.grep_log("Failed .+ commit log segments. Commit disk failure policy is stop; terminating thread") - debug(failure) - self.assertTrue(failure, "Cannot find the commitlog failure message in logs") - self.assertTrue(self.node1.is_running(), "Node1 should still be running") + logger.debug(failure) + assert failure, "Cannot find the commitlog failure message in logs" + assert self.node1.is_running(), "Node1 should still be running" # Cannot write anymore after the failure - with self.assertRaises(NoHostAvailable): + with pytest.raises(NoHostAvailable): self.session1.execute(""" INSERT INTO test (key, col1) VALUES (2, 2); """) # Should not be able to read neither - with self.assertRaises(NoHostAvailable): + with pytest.raises(NoHostAvailable): self.session1.execute(""" "SELECT * FROM test;" """) - def stop_commit_failure_policy_test(self): + def test_stop_commit_failure_policy(self): """ Test the stop_commit commitlog failure policy """ @@ -347,26 +351,26 @@ class TestCommitLog(Tester): self._provoke_commitlog_failure() failure = self.node1.grep_log("Failed .+ commit log segments. Commit disk failure policy is stop_commit; terminating thread") - debug(failure) - self.assertTrue(failure, "Cannot find the commitlog failure message in logs") - self.assertTrue(self.node1.is_running(), "Node1 should still be running") + logger.debug(failure) + assert failure, "Cannot find the commitlog failure message in logs" + assert self.node1.is_running(), "Node1 should still be running" # Cannot write anymore after the failure - debug('attempting to insert to node with failing commitlog; should fail') - with self.assertRaises((OperationTimedOut, WriteTimeout)): + logger.debug('attempting to insert to node with failing commitlog; should fail') + with pytest.raises((OperationTimedOut, WriteTimeout)): self.session1.execute(""" INSERT INTO test (key, col1) VALUES (2, 2); """) # Should be able to read - debug('attempting to read from node with failing commitlog; should succeed') + logger.debug('attempting to read from node with failing commitlog; should succeed') assert_one( self.session1, "SELECT * FROM test where key=2;", [2, 2] ) - def die_failure_policy_test(self): + def test_die_failure_policy(self): """ Test the die commitlog failure policy """ @@ -376,11 +380,11 @@ class TestCommitLog(Tester): self._provoke_commitlog_failure() failure = self.node1.grep_log("ERROR \[COMMIT-LOG-ALLOCATOR\].+JVM state determined to be unstable. Exiting forcefully") - debug(failure) - self.assertTrue(failure, "Cannot find the commitlog failure message in logs") - self.assertFalse(self.node1.is_running(), "Node1 should not be running") + logger.debug(failure) + assert failure, "Cannot find the commitlog failure message in logs" + assert not self.node1.is_running(), "Node1 should not be running" - def ignore_failure_policy_test(self): + def test_ignore_failure_policy(self): """ Test the ignore commitlog failure policy """ @@ -390,8 +394,8 @@ class TestCommitLog(Tester): self._provoke_commitlog_failure() failure = self.node1.grep_log("ERROR \[COMMIT-LOG-ALLOCATOR\].+Failed .+ commit log segments") - self.assertTrue(failure, "Cannot find the commitlog failure message in logs") - self.assertTrue(self.node1.is_running(), "Node1 should still be running") + assert failure, "Cannot find the commitlog failure message in logs" + assert self.node1.is_running(), "Node1 should still be running" # on Windows, we can't delete the segments if they're chmod to 0 so they'll still be available for use by CLSM, # and we can still create new segments since os.chmod is limited to stat.S_IWRITE and stat.S_IREAD to set files @@ -401,10 +405,10 @@ class TestCommitLog(Tester): if is_win(): # We expect this to succeed self.session1.execute(query) - self.assertFalse(self.node1.grep_log("terminating thread"), "thread was terminated but CL error should have been ignored.") - self.assertTrue(self.node1.is_running(), "Node1 should still be running after an ignore error on CL") + assert not self.node1.grep_log("terminating thread"), "thread was terminated but CL error should have been ignored." + assert self.node1.is_running(), "Node1 should still be running after an ignore error on CL" else: - with self.assertRaises((OperationTimedOut, WriteTimeout)): + with pytest.raises((OperationTimedOut, WriteTimeout)): self.session1.execute(query) # Should not exist @@ -436,13 +440,11 @@ class TestCommitLog(Tester): and the commit_failure_policy is stop, C* shouldn't startup @jira_ticket CASSANDRA-9749 """ - if not hasattr(self, 'ignore_log_patterns'): - self.ignore_log_patterns = [] - expected_error = "Exiting due to error while processing commit log during initialization." - self.ignore_log_patterns.append(expected_error) + self.fixture_dtest_setup.ignore_log_patterns = list(self.fixture_dtest_setup.ignore_log_patterns) + [ + expected_error] node = self.node1 - self.assertIsInstance(node, Node) + assert isinstance(node, Node) node.set_configuration_options({'commit_failure_policy': 'stop', 'commitlog_sync_period_in_ms': 1000}) self.cluster.start() @@ -454,7 +456,7 @@ class TestCommitLog(Tester): cursor.execute("INSERT INTO ks.tbl (k, v) VALUES ({0}, {0})".format(i)) results = list(cursor.execute("SELECT * FROM ks.tbl")) - self.assertEqual(len(results), 10) + assert len(results) == 10 # with the commitlog_sync_period_in_ms set to 1000, # this sleep guarantees that the commitlog data is @@ -469,14 +471,14 @@ class TestCommitLog(Tester): ks_dir = os.path.join(data_dir, 'ks') db_dir = os.listdir(ks_dir)[0] sstables = len([f for f in os.listdir(os.path.join(ks_dir, db_dir)) if f.endswith('.db')]) - self.assertEqual(sstables, 0) + assert sstables == 0 # modify the commit log crc values cl_dir = os.path.join(path, 'commitlogs') - self.assertTrue(len(os.listdir(cl_dir)) > 0) + assert len(os.listdir(cl_dir)) > 0 for cl in os.listdir(cl_dir): # locate the CRC location - with open(os.path.join(cl_dir, cl), 'r') as f: + with open(os.path.join(cl_dir, cl), 'rb') as f: f.seek(0) version = struct.unpack('>i', f.read(4))[0] crc_pos = 12 @@ -486,22 +488,22 @@ class TestCommitLog(Tester): crc_pos += 2 + psize # rewrite it with crap - with open(os.path.join(cl_dir, cl), 'w') as f: + with open(os.path.join(cl_dir, cl), 'wb') as f: f.seek(crc_pos) f.write(struct.pack('>i', 123456)) # verify said crap - with open(os.path.join(cl_dir, cl), 'r') as f: + with open(os.path.join(cl_dir, cl), 'rb') as f: f.seek(crc_pos) crc = struct.unpack('>i', f.read(4))[0] - self.assertEqual(crc, 123456) + assert crc == 123456 mark = node.mark_log() node.start() node.watch_log_for(expected_error, from_mark=mark) - with self.assertRaises(TimeoutError): + with pytest.raises(TimeoutError): node.wait_for_binary_interface(from_mark=mark, timeout=20) - self.assertFalse(node.is_running()) + assert not node.is_running() @since('2.2') def test_compression_error(self): @@ -510,13 +512,11 @@ class TestCommitLog(Tester): if the commit log header refers to an unknown compression class, and the commit_failure_policy is stop, C* shouldn't start up """ - if not hasattr(self, 'ignore_log_patterns'): - self.ignore_log_patterns = [] - expected_error = 'Could not create Compression for type org.apache.cassandra.io.compress.LZ5Compressor' - self.ignore_log_patterns.append(expected_error) + self.fixture_dtest_setup.ignore_log_patterns = list(self.fixture_dtest_setup.ignore_log_patterns) + [ + expected_error] node = self.node1 - self.assertIsInstance(node, Node) + assert isinstance(node, Node) node.set_configuration_options({'commit_failure_policy': 'stop', 'commitlog_compression': [{'class_name': 'LZ4Compressor'}], 'commitlog_sync_period_in_ms': 1000}) @@ -530,7 +530,7 @@ class TestCommitLog(Tester): cursor.execute("INSERT INTO ks1.tbl (k, v) VALUES ({0}, {0})".format(i)) results = list(cursor.execute("SELECT * FROM ks1.tbl")) - self.assertEqual(len(results), 10) + assert len(results) == 10 # with the commitlog_sync_period_in_ms set to 1000, # this sleep guarantees that the commitlog data is @@ -545,31 +545,37 @@ class TestCommitLog(Tester): ks_dir = os.path.join(data_dir, 'ks1') db_dir = os.listdir(ks_dir)[0] sstables = sstables + len([f for f in os.listdir(os.path.join(ks_dir, db_dir)) if f.endswith('.db')]) - self.assertEqual(sstables, 0) + assert sstables == 0 def get_header_crc(header): """ When calculating the header crc, C* splits up the 8b id, first adding the 4 least significant bytes to the crc, then the 5 most significant bytes, so this splits them and calculates the same way """ - new_header = header[:4] + new_header = bytearray(header[:4]) # C* evaluates most and least significant 4 bytes out of order - new_header += header[8:12] - new_header += header[4:8] + new_header.extend(header[8:12]) + new_header.extend(header[4:8]) # C* evaluates the short parameter length as an int - new_header += '\x00\x00' + header[12:14] # the - new_header += header[14:] - return binascii.crc32(new_header) + new_header.extend(b'\x00\x00') + new_header.extend(header[12:14]) # the + new_header.extend(header[14:]) + + # https://docs.python.org/2/library/binascii.html + # "Changed in version 2.6: The return value is in the range [-2**31, 2**31-1] regardless + # of platform. In the past the value would be signed on some platforms and unsigned on + # others. Use & 0xffffffff on the value if you want it to match Python 3 behavior." + return binascii.crc32(new_header) & 0xffffffff # modify the compression parameters to look for a compressor that isn't there # while this scenario is pretty unlikely, if a jar or lib got moved or something, # you'd have a similar situation, which would be fixable by the user path = node.get_path() cl_dir = os.path.join(path, 'commitlogs') - self.assertTrue(len(os.listdir(cl_dir)) > 0) + assert len(os.listdir(cl_dir)) > 0 for cl in os.listdir(cl_dir): # read the header and find the crc location - with open(os.path.join(cl_dir, cl), 'r') as f: + with open(os.path.join(cl_dir, cl), 'rb') as f: f.seek(0) crc_pos = 12 f.seek(crc_pos) @@ -583,29 +589,39 @@ class TestCommitLog(Tester): # check that we're going this right f.seek(0) header_bytes = f.read(header_length) - self.assertEqual(get_header_crc(header_bytes), crc) + + # https://docs.python.org/2/library/binascii.html + # "Changed in version 2.6: The return value is in the range [-2**31, 2**31-1] regardless + # of platform. In the past the value would be signed on some platforms and unsigned on + # others. Use & 0xffffffff on the value if you want it to match Python 3 behavior." + assert get_header_crc(header_bytes) == (crc & 0xffffffff) # rewrite it with imaginary compressor - self.assertIn('LZ4Compressor', header_bytes) - header_bytes = header_bytes.replace('LZ4Compressor', 'LZ5Compressor') - self.assertNotIn('LZ4Compressor', header_bytes) - self.assertIn('LZ5Compressor', header_bytes) - with open(os.path.join(cl_dir, cl), 'w') as f: + assert 'LZ4Compressor'.encode("ascii") in header_bytes + header_bytes = header_bytes.replace('LZ4Compressor'.encode("ascii"), 'LZ5Compressor'.encode("ascii")) + assert 'LZ4Compressor'.encode("ascii") not in header_bytes + assert 'LZ5Compressor'.encode("ascii") in header_bytes + with open(os.path.join(cl_dir, cl), 'wb') as f: f.seek(0) f.write(header_bytes) f.seek(crc_pos) - f.write(struct.pack('>i', get_header_crc(header_bytes))) + f.write(struct.pack('>I', get_header_crc(header_bytes))) # verify we wrote everything correctly - with open(os.path.join(cl_dir, cl), 'r') as f: + with open(os.path.join(cl_dir, cl), 'rb') as f: f.seek(0) - self.assertEqual(f.read(header_length), header_bytes) + assert f.read(header_length) == header_bytes f.seek(crc_pos) crc = struct.unpack('>i', f.read(4))[0] - self.assertEqual(crc, get_header_crc(header_bytes)) + + # https://docs.python.org/2/library/binascii.html + # "Changed in version 2.6: The return value is in the range [-2**31, 2**31-1] regardless + # of platform. In the past the value would be signed on some platforms and unsigned on + # others. Use & 0xffffffff on the value if you want it to match Python 3 behavior." + assert (crc & 0xffffffff) == get_header_crc(header_bytes) mark = node.mark_log() node.start() node.watch_log_for(expected_error, from_mark=mark) - with self.assertRaises(TimeoutError): + with pytest.raises(TimeoutError): node.wait_for_binary_interface(from_mark=mark, timeout=20)
http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/compaction_test.py ---------------------------------------------------------------------- diff --git a/compaction_test.py b/compaction_test.py index 999f83c..49a2923 100644 --- a/compaction_test.py +++ b/compaction_test.py @@ -5,25 +5,29 @@ import string import tempfile import time from distutils.version import LooseVersion - +import pytest import parse +import logging -from dtest import Tester, debug, create_ks +from dtest import Tester, create_ks from tools.assertions import assert_length_equal, assert_none, assert_one -from tools.decorators import since +since = pytest.mark.since +logger = logging.getLogger(__name__) -class TestCompaction(Tester): +strategies = ['LeveledCompactionStrategy', 'SizeTieredCompactionStrategy', 'DateTieredCompactionStrategy'] - __test__ = False - def setUp(self): - Tester.setUp(self) +class TestCompaction(Tester): + + @pytest.fixture(scope='function', autouse=True) + def fixture_set_cluster_log_level(self, fixture_dtest_setup): # compaction test for version 2.2.2 and above relies on DEBUG log in debug.log - self.cluster.set_log_level("DEBUG") + fixture_dtest_setup.cluster.set_log_level("DEBUG") - @since('0', '2.2.X') - def compaction_delete_test(self): + @pytest.mark.parametrize("strategy", strategies) + @since('0', max_version='2.2.X') + def test_compaction_delete(self, strategy): """ Test that executing a delete properly tombstones a row. Insert data, delete a partition of data and check that the requesite rows are tombstoned. @@ -35,7 +39,7 @@ class TestCompaction(Tester): session = self.patient_cql_connection(node1) create_ks(session, 'ks', 1) - session.execute("create table ks.cf (key int PRIMARY KEY, val int) with compaction = {'class':'" + self.strategy + "'} and gc_grace_seconds = 30;") + session.execute("create table ks.cf (key int PRIMARY KEY, val int) with compaction = {'class':'" + strategy + "'} and gc_grace_seconds = 30;") for x in range(0, 100): session.execute('insert into cf (key, val) values (' + str(x) + ',1)') @@ -58,9 +62,9 @@ class TestCompaction(Tester): numfound = jsoninfo.count("markedForDeleteAt") - self.assertEqual(numfound, 10) + assert numfound == 10 - def data_size_test(self): + def test_data_size(self): """ Ensure that data size does not have unwarranted increases after compaction. Insert data and check data size before and after a compaction. @@ -80,8 +84,8 @@ class TestCompaction(Tester): output = output[output.find("Space used (live)"):] initialValue = int(output[output.find(":") + 1:output.find("\n")].strip()) else: - debug("datasize not found") - debug(output) + logger.debug("datasize not found") + logger.debug(output) node1.flush() node1.compact() @@ -93,31 +97,32 @@ class TestCompaction(Tester): output = output[output.find("Space used (live)"):] finalValue = int(output[output.find(":") + 1:output.find("\n")].strip()) else: - debug("datasize not found") + logger.debug("datasize not found") # allow 5% size increase - if we have few sstables it is not impossible that live size increases *slightly* after compaction - self.assertLess(finalValue, initialValue * 1.05) + assert finalValue < initialValue * 1.05 - def bloomfilter_size_test(self): + @pytest.mark.parametrize("strategy", strategies) + def test_bloomfilter_size(self, strategy): """ @jira_ticket CASSANDRA-11344 Check that bloom filter size is between 50KB and 100KB for 100K keys """ - if not hasattr(self, 'strategy') or self.strategy == "LeveledCompactionStrategy": + if not hasattr(self, 'strategy') or strategy == "LeveledCompactionStrategy": strategy_string = 'strategy=LeveledCompactionStrategy,sstable_size_in_mb=1' min_bf_size = 40000 max_bf_size = 100000 else: - if self.strategy == "DateTieredCompactionStrategy": + if strategy == "DateTieredCompactionStrategy": strategy_string = "strategy=DateTieredCompactionStrategy,base_time_seconds=86400" # we want a single sstable, so make sure we don't have a tiny first window else: - strategy_string = "strategy={}".format(self.strategy) + strategy_string = "strategy={}".format(strategy) min_bf_size = 100000 max_bf_size = 150000 cluster = self.cluster cluster.populate(1).start(wait_for_binary_proto=True) [node1] = cluster.nodelist() - for x in xrange(0, 5): + for x in range(0, 5): node1.stress(['write', 'n=100K', "no-warmup", "cl=ONE", "-rate", "threads=300", "-schema", "replication(factor=1)", "compaction({},enabled=false)".format(strategy_string)]) @@ -134,36 +139,37 @@ class TestCompaction(Tester): # in some rare cases we can end up with more than one sstable per data directory with # non-lcs strategies (see CASSANDRA-12323) - if not hasattr(self, 'strategy') or self.strategy == "LeveledCompactionStrategy": + if not hasattr(self, 'strategy') or strategy == "LeveledCompactionStrategy": size_factor = 1 else: sstable_count = len(node1.get_sstables('keyspace1', 'standard1')) dir_count = len(node1.data_directories()) - debug("sstable_count is: {}".format(sstable_count)) - debug("dir_count is: {}".format(dir_count)) + logger.debug("sstable_count is: {}".format(sstable_count)) + logger.debug("dir_count is: {}".format(dir_count)) if node1.get_cassandra_version() < LooseVersion('3.2'): size_factor = sstable_count else: size_factor = sstable_count / float(dir_count) - debug("bloom filter size is: {}".format(bfSize)) - debug("size factor = {}".format(size_factor)) - self.assertGreaterEqual(bfSize, size_factor * min_bf_size) - self.assertLessEqual(bfSize, size_factor * max_bf_size) + logger.debug("bloom filter size is: {}".format(bfSize)) + logger.debug("size factor = {}".format(size_factor)) + assert bfSize >= size_factor * min_bf_size + assert bfSize <= size_factor * max_bf_size - def sstable_deletion_test(self): + @pytest.mark.parametrize("strategy", strategies) + def test_sstable_deletion(self, strategy): """ Test that sstables are deleted properly when able after compaction. Insert data setting gc_grace_seconds to 0, and determine sstable is deleted upon data deletion. """ - self.skip_if_no_major_compaction() + self.skip_if_no_major_compaction(strategy) cluster = self.cluster cluster.populate(1).start(wait_for_binary_proto=True) [node1] = cluster.nodelist() session = self.patient_cql_connection(node1) create_ks(session, 'ks', 1) - session.execute("create table cf (key int PRIMARY KEY, val int) with gc_grace_seconds = 0 and compaction= {'class':'" + self.strategy + "'}") + session.execute("create table cf (key int PRIMARY KEY, val int) with gc_grace_seconds = 0 and compaction= {'class':'" + strategy + "'}") for x in range(0, 100): session.execute('insert into cf (key, val) values (' + str(x) + ',1)') @@ -180,22 +186,21 @@ class TestCompaction(Tester): cfs = os.listdir(os.path.join(data_dir, "ks")) ssdir = os.listdir(os.path.join(data_dir, "ks", cfs[0])) for afile in ssdir: - self.assertFalse("Data" in afile, afile) + assert not "Data" in afile, afile except OSError: self.fail("Path to sstables not valid.") - def dtcs_deletion_test(self): + @pytest.mark.parametrize("strategy", ['DateTieredCompactionStrategy']) + def test_dtcs_deletion(self, strategy): """ Test that sstables are deleted properly when able after compaction with DateTieredCompactionStrategy. Insert data setting max_sstable_age_days low, and determine sstable is deleted upon data deletion past max_sstable_age_days. """ - if not hasattr(self, 'strategy'): - self.strategy = 'DateTieredCompactionStrategy' - elif self.strategy != 'DateTieredCompactionStrategy': - self.skipTest('Not implemented unless DateTieredCompactionStrategy is used') + if strategy != 'DateTieredCompactionStrategy': + pytest.skip('Not implemented unless DateTieredCompactionStrategy is used') cluster = self.cluster cluster.populate(1).start(wait_for_binary_proto=True) @@ -215,7 +220,7 @@ class TestCompaction(Tester): expected_sstable_count = 1 if self.cluster.version() > LooseVersion('3.1'): expected_sstable_count = cluster.data_dir_count - self.assertEqual(len(expired_sstables), expected_sstable_count) + assert len(expired_sstables) == expected_sstable_count # write a new sstable to make DTCS check for expired sstables: for x in range(0, 100): session.execute('insert into cf (key, val) values ({}, {})'.format(x, x)) @@ -223,7 +228,7 @@ class TestCompaction(Tester): time.sleep(5) # we only check every 10 minutes - sstable should still be there: for expired_sstable in expired_sstables: - self.assertIn(expired_sstable, node1.get_sstables('ks', 'cf')) + assert expired_sstable, node1.get_sstables('ks' in 'cf') session.execute("alter table cf with compaction = {'class':'DateTieredCompactionStrategy', 'max_sstable_age_days':0.00035, 'min_threshold':2, 'expired_sstable_check_frequency_seconds':0}") time.sleep(1) @@ -232,9 +237,9 @@ class TestCompaction(Tester): node1.flush() time.sleep(5) for expired_sstable in expired_sstables: - self.assertNotIn(expired_sstable, node1.get_sstables('ks', 'cf')) + assert expired_sstable, node1.get_sstables('ks' not in 'cf') - def compaction_throughput_test(self): + def test_compaction_throughput(self): """ Test setting compaction throughput. Set throughput, insert data and ensure compaction performance corresponds. @@ -277,24 +282,26 @@ class TestCompaction(Tester): } units = ['MB'] if cluster.version() < LooseVersion('3.6') else ['KiB', 'MiB', 'GiB'] - self.assertIn(found_units, units) + assert found_units in units - debug(avgthroughput) + logger.debug(avgthroughput) avgthroughput_mb = unit_conversion_dct[found_units] * float(avgthroughput) # The throughput in the log is computed independantly from the throttling and on the output files while # throttling is on the input files, so while that throughput shouldn't be higher than the one set in # principle, a bit of wiggle room is expected - self.assertGreaterEqual(float(threshold) + 0.5, avgthroughput_mb) + assert float(threshold) + 0.5 >= avgthroughput_mb - def compaction_strategy_switching_test(self): - """Ensure that switching strategies does not result in problems. + @pytest.mark.parametrize("strategy", strategies) + def test_compaction_strategy_switching(self, strategy): + """ + Ensure that switching strategies does not result in problems. Insert data, switch strategies, then check against data loss. """ strategies = ['LeveledCompactionStrategy', 'SizeTieredCompactionStrategy', 'DateTieredCompactionStrategy'] - if self.strategy in strategies: - strategies.remove(self.strategy) + if strategy in strategies: + strategies.remove(strategy) cluster = self.cluster cluster.populate(1).start(wait_for_binary_proto=True) [node1] = cluster.nodelist() @@ -303,7 +310,7 @@ class TestCompaction(Tester): session = self.patient_cql_connection(node1) create_ks(session, 'ks', 1) - session.execute("create table ks.cf (key int PRIMARY KEY, val int) with gc_grace_seconds = 0 and compaction= {'class':'" + self.strategy + "'};") + session.execute("create table ks.cf (key int PRIMARY KEY, val int) with gc_grace_seconds = 0 and compaction= {'class':'" + strategy + "'};") for x in range(0, 100): session.execute('insert into ks.cf (key, val) values (' + str(x) + ',1)') @@ -326,7 +333,7 @@ class TestCompaction(Tester): time.sleep(5) cluster.start(wait_for_binary_proto=True) - def large_compaction_warning_test(self): + def test_large_compaction_warning(self): """ @jira_ticket CASSANDRA-9643 Check that we log a warning when the partition size is bigger than compaction_large_partition_warning_threshold_mb @@ -347,7 +354,7 @@ class TestCompaction(Tester): ret = list(session.execute("SELECT properties from ks.large where userid = 'user'")) assert_length_equal(ret, 1) - self.assertEqual(200, len(ret[0][0].keys())) + assert 200 == len(list(ret[0][0].keys())) node.flush() @@ -358,9 +365,10 @@ class TestCompaction(Tester): ret = list(session.execute("SELECT properties from ks.large where userid = 'user'")) assert_length_equal(ret, 1) - self.assertEqual(200, len(ret[0][0].keys())) + assert 200 == len(list(ret[0][0].keys())) - def disable_autocompaction_nodetool_test(self): + @pytest.mark.parametrize("strategy", strategies) + def test_disable_autocompaction_nodetool(self, strategy): """ Make sure we can enable/disable compaction using nodetool """ @@ -369,7 +377,7 @@ class TestCompaction(Tester): [node] = cluster.nodelist() session = self.patient_cql_connection(node) create_ks(session, 'ks', 1) - session.execute('CREATE TABLE to_disable (id int PRIMARY KEY, d TEXT) WITH compaction = {{\'class\':\'{0}\'}}'.format(self.strategy)) + session.execute('CREATE TABLE to_disable (id int PRIMARY KEY, d TEXT) WITH compaction = {{\'class\':\'{0}\'}}'.format(strategy)) node.nodetool('disableautocompaction ks to_disable') for i in range(1000): session.execute('insert into to_disable (id, d) values ({0}, \'{1}\')'.format(i, 'hello' * 100)) @@ -379,13 +387,14 @@ class TestCompaction(Tester): log_file = 'system.log' else: log_file = 'debug.log' - self.assertTrue(len(node.grep_log('Compacting.+to_disable', filename=log_file)) == 0, 'Found compaction log items for {0}'.format(self.strategy)) + assert len(node.grep_log('Compacting.+to_disable', filename=log_file)) == 0, 'Found compaction log items for {0}'.format(strategy) node.nodetool('enableautocompaction ks to_disable') # sleep to allow compactions to start time.sleep(2) - self.assertTrue(len(node.grep_log('Compacting.+to_disable', filename=log_file)) > 0, 'Found no log items for {0}'.format(self.strategy)) + assert len(node.grep_log('Compacting.+to_disable', filename=log_file)) > 0, 'Found no log items for {0}'.format(strategy) - def disable_autocompaction_schema_test(self): + @pytest.mark.parametrize("strategy", strategies) + def test_disable_autocompaction_schema(self, strategy): """ Make sure we can disable compaction via the schema compaction parameter 'enabled' = false """ @@ -394,7 +403,7 @@ class TestCompaction(Tester): [node] = cluster.nodelist() session = self.patient_cql_connection(node) create_ks(session, 'ks', 1) - session.execute('CREATE TABLE to_disable (id int PRIMARY KEY, d TEXT) WITH compaction = {{\'class\':\'{0}\', \'enabled\':\'false\'}}'.format(self.strategy)) + session.execute('CREATE TABLE to_disable (id int PRIMARY KEY, d TEXT) WITH compaction = {{\'class\':\'{0}\', \'enabled\':\'false\'}}'.format(strategy)) for i in range(1000): session.execute('insert into to_disable (id, d) values ({0}, \'{1}\')'.format(i, 'hello' * 100)) if i % 100 == 0: @@ -404,7 +413,7 @@ class TestCompaction(Tester): else: log_file = 'debug.log' - self.assertTrue(len(node.grep_log('Compacting.+to_disable', filename=log_file)) == 0, 'Found compaction log items for {0}'.format(self.strategy)) + assert len(node.grep_log('Compacting.+to_disable', filename=log_file)) == 0, 'Found compaction log items for {0}'.format(strategy) # should still be disabled after restart: node.stop() node.start(wait_for_binary_proto=True) @@ -412,13 +421,14 @@ class TestCompaction(Tester): session.execute("use ks") # sleep to make sure we dont start any logs time.sleep(2) - self.assertTrue(len(node.grep_log('Compacting.+to_disable', filename=log_file)) == 0, 'Found compaction log items for {0}'.format(self.strategy)) + assert len(node.grep_log('Compacting.+to_disable', filename=log_file)) == 0, 'Found compaction log items for {0}'.format(strategy) node.nodetool('enableautocompaction ks to_disable') # sleep to allow compactions to start time.sleep(2) - self.assertTrue(len(node.grep_log('Compacting.+to_disable', filename=log_file)) > 0, 'Found no log items for {0}'.format(self.strategy)) + assert len(node.grep_log('Compacting.+to_disable', filename=log_file)) > 0, 'Found no log items for {0}'.format(strategy) - def disable_autocompaction_alter_test(self): + @pytest.mark.parametrize("strategy", strategies) + def test_disable_autocompaction_alter(self, strategy): """ Make sure we can enable compaction using an alter-statement """ @@ -427,8 +437,8 @@ class TestCompaction(Tester): [node] = cluster.nodelist() session = self.patient_cql_connection(node) create_ks(session, 'ks', 1) - session.execute('CREATE TABLE to_disable (id int PRIMARY KEY, d TEXT) WITH compaction = {{\'class\':\'{0}\'}}'.format(self.strategy)) - session.execute('ALTER TABLE to_disable WITH compaction = {{\'class\':\'{0}\', \'enabled\':\'false\'}}'.format(self.strategy)) + session.execute('CREATE TABLE to_disable (id int PRIMARY KEY, d TEXT) WITH compaction = {{\'class\':\'{0}\'}}'.format(strategy)) + session.execute('ALTER TABLE to_disable WITH compaction = {{\'class\':\'{0}\', \'enabled\':\'false\'}}'.format(strategy)) for i in range(1000): session.execute('insert into to_disable (id, d) values ({0}, \'{1}\')'.format(i, 'hello' * 100)) if i % 100 == 0: @@ -437,16 +447,17 @@ class TestCompaction(Tester): log_file = 'system.log' else: log_file = 'debug.log' - self.assertTrue(len(node.grep_log('Compacting.+to_disable', filename=log_file)) == 0, 'Found compaction log items for {0}'.format(self.strategy)) - session.execute('ALTER TABLE to_disable WITH compaction = {{\'class\':\'{0}\', \'enabled\':\'true\'}}'.format(self.strategy)) + assert len(node.grep_log('Compacting.+to_disable', filename=log_file)) == 0, 'Found compaction log items for {0}'.format(strategy) + session.execute('ALTER TABLE to_disable WITH compaction = {{\'class\':\'{0}\', \'enabled\':\'true\'}}'.format(strategy)) # we need to flush atleast once when altering to enable: session.execute('insert into to_disable (id, d) values (99, \'hello\')') node.flush() # sleep to allow compactions to start time.sleep(2) - self.assertTrue(len(node.grep_log('Compacting.+to_disable', filename=log_file)) > 0, 'Found no log items for {0}'.format(self.strategy)) + assert len(node.grep_log('Compacting.+to_disable', filename=log_file)) > 0, 'Found no log items for {0}'.format(strategy) - def disable_autocompaction_alter_and_nodetool_test(self): + @pytest.mark.parametrize("strategy", strategies) + def test_disable_autocompaction_alter_and_nodetool(self, strategy): """ Make sure compaction stays disabled after an alter statement where we have disabled using nodetool first """ @@ -455,7 +466,7 @@ class TestCompaction(Tester): [node] = cluster.nodelist() session = self.patient_cql_connection(node) create_ks(session, 'ks', 1) - session.execute('CREATE TABLE to_disable (id int PRIMARY KEY, d TEXT) WITH compaction = {{\'class\':\'{0}\'}}'.format(self.strategy)) + session.execute('CREATE TABLE to_disable (id int PRIMARY KEY, d TEXT) WITH compaction = {{\'class\':\'{0}\'}}'.format(strategy)) node.nodetool('disableautocompaction ks to_disable') for i in range(1000): session.execute('insert into to_disable (id, d) values ({0}, \'{1}\')'.format(i, 'hello' * 100)) @@ -465,19 +476,19 @@ class TestCompaction(Tester): log_file = 'system.log' else: log_file = 'debug.log' - self.assertTrue(len(node.grep_log('Compacting.+to_disable', filename=log_file)) == 0, 'Found compaction log items for {0}'.format(self.strategy)) - session.execute('ALTER TABLE to_disable WITH compaction = {{\'class\':\'{0}\', \'tombstone_threshold\':0.9}}'.format(self.strategy)) + assert len(node.grep_log('Compacting.+to_disable', filename=log_file)) == 0, 'Found compaction log items for {0}'.format(strategy) + session.execute('ALTER TABLE to_disable WITH compaction = {{\'class\':\'{0}\', \'tombstone_threshold\':0.9}}'.format(strategy)) session.execute('insert into to_disable (id, d) values (99, \'hello\')') node.flush() time.sleep(2) - self.assertTrue(len(node.grep_log('Compacting.+to_disable', filename=log_file)) == 0, 'Found log items for {0}'.format(self.strategy)) + assert len(node.grep_log('Compacting.+to_disable', filename=log_file)) == 0, 'Found log items for {0}'.format(strategy) node.nodetool('enableautocompaction ks to_disable') # sleep to allow compactions to start time.sleep(2) - self.assertTrue(len(node.grep_log('Compacting.+to_disable', filename=log_file)) > 0, 'Found no log items for {0}'.format(self.strategy)) + assert len(node.grep_log('Compacting.+to_disable', filename=log_file)) > 0, 'Found no log items for {0}'.format(strategy) @since('3.7') - def user_defined_compaction_test(self): + def test_user_defined_compaction(self): """ Test a user defined compaction task by generating a few sstables with cassandra stress and autocompaction disabled, and then passing a list of sstable data files directly to nodetool compact. @@ -499,20 +510,21 @@ class TestCompaction(Tester): node1.nodetool('flush keyspace1 standard1') sstable_files = ' '.join(node1.get_sstable_data_files('keyspace1', 'standard1')) - debug('Compacting {}'.format(sstable_files)) + logger.debug('Compacting {}'.format(sstable_files)) node1.nodetool('compact --user-defined {}'.format(sstable_files)) sstable_files = node1.get_sstable_data_files('keyspace1', 'standard1') - self.assertEquals(len(node1.data_directories()), len(sstable_files), - 'Expected one sstable data file per node directory but got {}'.format(sstable_files)) + assert len(node1.data_directories()) == len(sstable_files), \ + 'Expected one sstable data file per node directory but got {}'.format(sstable_files) + @pytest.mark.parametrize("strategy", ['LeveledCompactionStrategy']) @since('3.10') - def fanout_size_test(self): + def test_fanout_size(self, strategy): """ @jira_ticket CASSANDRA-11550 """ - if not hasattr(self, 'strategy') or self.strategy != 'LeveledCompactionStrategy': - self.skipTest('Not implemented unless LeveledCompactionStrategy is used') + if not hasattr(self, 'strategy') or strategy != 'LeveledCompactionStrategy': + pytest.skip('Not implemented unless LeveledCompactionStrategy is used') cluster = self.cluster cluster.populate(1).start(wait_for_binary_proto=True) @@ -522,7 +534,7 @@ class TestCompaction(Tester): node1.nodetool('disableautocompaction') session = self.patient_cql_connection(node1) - debug("Altering compaction strategy to LCS") + logger.debug("Altering compaction strategy to LCS") session.execute("ALTER TABLE keyspace1.standard1 with compaction={'class': 'LeveledCompactionStrategy', 'sstable_size_in_mb':1, 'fanout_size':10};") stress_write(node1, keycount=1000000) @@ -538,9 +550,9 @@ class TestCompaction(Tester): # [0, ?/10, ?, 0, 0, 0...] p = re.compile(r'0,\s(\d+)/10,.*') m = p.search(output) - self.assertEqual(10 * len(node1.data_directories()), int(m.group(1))) + assert 10 * len(node1.data_directories()) == int(m.group(1)) - debug("Altering the fanout_size") + logger.debug("Altering the fanout_size") session.execute("ALTER TABLE keyspace1.standard1 with compaction={'class': 'LeveledCompactionStrategy', 'sstable_size_in_mb':1, 'fanout_size':5};") # trigger the compaction @@ -551,12 +563,12 @@ class TestCompaction(Tester): # [0, ?/5, ?/25, ?, 0, 0...] p = re.compile(r'0,\s(\d+)/5,\s(\d+)/25,.*') m = p.search(output) - self.assertEqual(5 * len(node1.data_directories()), int(m.group(1))) - self.assertEqual(25 * len(node1.data_directories()), int(m.group(2))) + assert 5 * len(node1.data_directories()) == int(m.group(1)) + assert 25 * len(node1.data_directories()) == int(m.group(2)) - def skip_if_no_major_compaction(self): - if self.cluster.version() < '2.2' and self.strategy == 'LeveledCompactionStrategy': - self.skipTest('major compaction not implemented for LCS in this version of Cassandra') + def skip_if_no_major_compaction(self, strategy): + if self.cluster.version() < '2.2' and strategy == 'LeveledCompactionStrategy': + pytest.skip(msg='major compaction not implemented for LCS in this version of Cassandra') def grep_sstables_in_each_level(node, table_name): @@ -567,14 +579,8 @@ def grep_sstables_in_each_level(node, table_name): def get_random_word(wordLen, population=string.ascii_letters + string.digits): - return ''.join([random.choice(population) for _ in range(wordLen)]) + return ''.join([random.choice(population) for _ in range(int(wordLen))]) def stress_write(node, keycount=100000): node.stress(['write', 'n={keycount}'.format(keycount=keycount)]) - - -strategies = ['LeveledCompactionStrategy', 'SizeTieredCompactionStrategy', 'DateTieredCompactionStrategy'] -for strategy in strategies: - cls_name = ('TestCompaction_with_' + strategy) - vars()[cls_name] = type(cls_name, (TestCompaction,), {'strategy': strategy, '__test__': True}) http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/compression_test.py ---------------------------------------------------------------------- diff --git a/compression_test.py b/compression_test.py index c8362c9..d865ba2 100644 --- a/compression_test.py +++ b/compression_test.py @@ -1,9 +1,13 @@ import os +import pytest +import logging from dtest import create_ks from scrub_test import TestHelper from tools.assertions import assert_crc_check_chance_equal -from tools.decorators import since + +since = pytest.mark.since +logger = logging.getLogger(__name__) class TestCompression(TestHelper): @@ -16,10 +20,10 @@ class TestCompression(TestHelper): with open(file, 'rb') as fh: file_start = fh.read(2) - return types.get(file_start.encode('hex'), 'UNKNOWN') + return types.get(file_start.hex(), 'UNKNOWN') @since("3.0") - def disable_compression_cql_test(self): + def test_disable_compression_cql(self): """ @jira_ticket CASSANDRA-8384 using new cql create table syntax to disable compression @@ -33,7 +37,7 @@ class TestCompression(TestHelper): session.execute("create table disabled_compression_table (id uuid PRIMARY KEY ) WITH compression = {'enabled': false};") session.cluster.refresh_schema_metadata() meta = session.cluster.metadata.keyspaces['ks'].tables['disabled_compression_table'] - self.assertEqual('false', meta.options['compression']['enabled']) + assert 'false' == meta.options['compression']['enabled'] for n in range(0, 100): session.execute("insert into disabled_compression_table (id) values (uuid());") @@ -44,12 +48,12 @@ class TestCompression(TestHelper): for sstable_path in sstable_paths: sstable = os.path.join(sstable_path, sstables['disabled_compression_table'][1]) if os.path.exists(sstable): - self.assertEqual('NONE', self._get_compression_type(sstable)) + assert 'NONE' == self._get_compression_type(sstable) found = True - self.assertTrue(found) + assert found @since("3.0") - def compression_cql_options_test(self): + def test_compression_cql_options(self): """ @jira_ticket CASSANDRA-8384 using new cql create table syntax to configure compression @@ -72,12 +76,12 @@ class TestCompression(TestHelper): session.cluster.refresh_schema_metadata() meta = session.cluster.metadata.keyspaces['ks'].tables['compression_opts_table'] - self.assertEqual('org.apache.cassandra.io.compress.DeflateCompressor', meta.options['compression']['class']) - self.assertEqual('256', meta.options['compression']['chunk_length_in_kb']) + assert 'org.apache.cassandra.io.compress.DeflateCompressor' == meta.options['compression']['class'] + assert '256' == meta.options['compression']['chunk_length_in_kb'] assert_crc_check_chance_equal(session, "compression_opts_table", 0.25) warn = node.grep_log("The option crc_check_chance was deprecated as a compression option.") - self.assertEqual(len(warn), 0) + assert len(warn) == 0 session.execute(""" alter table compression_opts_table WITH compression = { @@ -87,13 +91,13 @@ class TestCompression(TestHelper): } """) warn = node.grep_log("The option crc_check_chance was deprecated as a compression option.") - self.assertEqual(len(warn), 1) + assert len(warn) == 1 # check metadata again after crc_check_chance_update session.cluster.refresh_schema_metadata() meta = session.cluster.metadata.keyspaces['ks'].tables['compression_opts_table'] - self.assertEqual('org.apache.cassandra.io.compress.DeflateCompressor', meta.options['compression']['class']) - self.assertEqual('256', meta.options['compression']['chunk_length_in_kb']) + assert 'org.apache.cassandra.io.compress.DeflateCompressor' == meta.options['compression']['class'] + assert '256' == meta.options['compression']['chunk_length_in_kb'] assert_crc_check_chance_equal(session, "compression_opts_table", 0.6) for n in range(0, 100): @@ -105,12 +109,12 @@ class TestCompression(TestHelper): for sstable_path in sstable_paths: sstable = os.path.join(sstable_path, sstables['compression_opts_table'][1]) if os.path.exists(sstable): - self.assertEqual('DEFLATE', self._get_compression_type(sstable)) + assert 'DEFLATE' == self._get_compression_type(sstable) found = True - self.assertTrue(found) + assert found @since("3.0") - def compression_cql_disabled_with_alter_test(self): + def test_compression_cql_disabled_with_alter(self): """ @jira_ticket CASSANDRA-8384 starting with compression enabled then disabling it @@ -131,17 +135,17 @@ class TestCompression(TestHelper): AND crc_check_chance = 0.25; """) meta = session.cluster.metadata.keyspaces['ks'].tables['start_enabled_compression_table'] - self.assertEqual('org.apache.cassandra.io.compress.SnappyCompressor', meta.options['compression']['class']) - self.assertEqual('256', meta.options['compression']['chunk_length_in_kb']) + assert 'org.apache.cassandra.io.compress.SnappyCompressor' == meta.options['compression']['class'] + assert '256' == meta.options['compression']['chunk_length_in_kb'] assert_crc_check_chance_equal(session, "start_enabled_compression_table", 0.25) session.execute("alter table start_enabled_compression_table with compression = {'enabled': false};") session.cluster.refresh_schema_metadata() meta = session.cluster.metadata.keyspaces['ks'].tables['start_enabled_compression_table'] - self.assertEqual('false', meta.options['compression']['enabled']) + assert 'false' == meta.options['compression']['enabled'] @since("3.0") - def compression_cql_enabled_with_alter_test(self): + def test_compression_cql_enabled_with_alter(self): """ @jira_ticket CASSANDRA-8384 starting with compression disabled and enabling it @@ -154,7 +158,7 @@ class TestCompression(TestHelper): create_ks(session, 'ks', 1) session.execute("create table start_disabled_compression_table (id uuid PRIMARY KEY ) WITH compression = {'enabled': false};") meta = session.cluster.metadata.keyspaces['ks'].tables['start_disabled_compression_table'] - self.assertEqual('false', meta.options['compression']['enabled']) + assert 'false' == meta.options['compression']['enabled'] session.execute("""alter table start_disabled_compression_table WITH compression = { 'class': 'SnappyCompressor', @@ -163,6 +167,6 @@ class TestCompression(TestHelper): session.cluster.refresh_schema_metadata() meta = session.cluster.metadata.keyspaces['ks'].tables['start_disabled_compression_table'] - self.assertEqual('org.apache.cassandra.io.compress.SnappyCompressor', meta.options['compression']['class']) - self.assertEqual('256', meta.options['compression']['chunk_length_in_kb']) + assert 'org.apache.cassandra.io.compress.SnappyCompressor' == meta.options['compression']['class'] + assert '256' == meta.options['compression']['chunk_length_in_kb'] assert_crc_check_chance_equal(session, "start_disabled_compression_table", 0.25) http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/concurrent_schema_changes_test.py ---------------------------------------------------------------------- diff --git a/concurrent_schema_changes_test.py b/concurrent_schema_changes_test.py index 49041f9..d0af49c 100644 --- a/concurrent_schema_changes_test.py +++ b/concurrent_schema_changes_test.py @@ -3,15 +3,19 @@ import os import pprint import re import time +import pytest +import logging + from random import randrange from threading import Thread -from unittest import skip from cassandra.concurrent import execute_concurrent from ccmlib.node import Node -from dtest import Tester, debug, create_ks -from tools.decorators import since +from dtest import Tester, create_ks + +since = pytest.mark.since +logger = logging.getLogger(__name__) def wait(delay=2): @@ -21,7 +25,7 @@ def wait(delay=2): time.sleep(delay) -@skip('awaiting CASSANDRA-10699') [email protected](reason='awaiting CASSANDRA-10699') class TestConcurrentSchemaChanges(Tester): allow_log_errors = True @@ -29,7 +33,7 @@ class TestConcurrentSchemaChanges(Tester): """ prepares for schema changes by creating a keyspace and column family. """ - debug("prepare_for_changes() " + str(namespace)) + logger.debug("prepare_for_changes() " + str(namespace)) # create a keyspace that will be used create_ks(session, "ks_%s" % namespace, 2) session.execute('USE ks_%s' % namespace) @@ -77,7 +81,7 @@ class TestConcurrentSchemaChanges(Tester): rebuild index (via jmx) set default_validation_class """ - debug("make_schema_changes() " + str(namespace)) + logger.debug("make_schema_changes() " + str(namespace)) session.execute('USE ks_%s' % namespace) # drop keyspace session.execute('DROP KEYSPACE ks2_%s' % namespace) @@ -117,14 +121,14 @@ class TestConcurrentSchemaChanges(Tester): def validate_schema_consistent(self, node): """ Makes sure that there is only one schema """ - debug("validate_schema_consistent() " + node.name) + logger.debug("validate_schema_consistent() " + node.name) response = node.nodetool('describecluster').stdout schemas = response.split('Schema versions:')[1].strip() num_schemas = len(re.findall('\[.*?\]', schemas)) - self.assertEqual(num_schemas, 1, "There were multiple schema versions: {}".format(pprint.pformat(schemas))) + assert num_schemas, 1 == "There were multiple schema versions: {}".format(pprint.pformat(schemas)) - def create_lots_of_tables_concurrently_test(self): + def test_create_lots_of_tables_concurrently(self): """ create tables across multiple threads concurrently """ @@ -141,18 +145,18 @@ class TestConcurrentSchemaChanges(Tester): results = execute_concurrent(session, cmds, raise_on_first_error=True, concurrency=200) for (success, result) in results: - self.assertTrue(success, "didn't get success on table create: {}".format(result)) + assert success, "didn't get success on table create: {}".format(result) wait(10) session.cluster.refresh_schema_metadata() table_meta = session.cluster.metadata.keyspaces["lots_o_tables"].tables - self.assertEqual(250, len(table_meta)) + assert 250 == len(table_meta) self.validate_schema_consistent(node1) self.validate_schema_consistent(node2) self.validate_schema_consistent(node3) - def create_lots_of_alters_concurrently_test(self): + def test_create_lots_of_alters_concurrently(self): """ create alters across multiple threads concurrently """ @@ -169,26 +173,26 @@ class TestConcurrentSchemaChanges(Tester): cmds = [("alter table base_{0} add c_{1} int".format(randrange(0, 10), n), ()) for n in range(500)] - debug("executing 500 alters") + logger.debug("executing 500 alters") results = execute_concurrent(session, cmds, raise_on_first_error=True, concurrency=150) for (success, result) in results: - self.assertTrue(success, "didn't get success on table create: {}".format(result)) + assert success, "didn't get success on table create: {}".format(result) - debug("waiting for alters to propagate") + logger.debug("waiting for alters to propagate") wait(30) session.cluster.refresh_schema_metadata() table_meta = session.cluster.metadata.keyspaces["lots_o_alters"].tables - column_ct = sum([len(table.columns) for table in table_meta.values()]) + column_ct = sum([len(table.columns) for table in list(table_meta.values())]) # primary key + alters - self.assertEqual(510, column_ct) + assert 510 == column_ct self.validate_schema_consistent(node1) self.validate_schema_consistent(node2) self.validate_schema_consistent(node3) - def create_lots_of_indexes_concurrently_test(self): + def test_create_lots_of_indexes_concurrently(self): """ create indexes across multiple threads concurrently """ @@ -205,7 +209,7 @@ class TestConcurrentSchemaChanges(Tester): session.execute("insert into base_{0} (id, c1, c2) values (uuid(), {1}, {2})".format(n, ins, ins)) wait(5) - debug("creating indexes") + logger.debug("creating indexes") cmds = [] for n in range(5): cmds.append(("create index ix_base_{0}_c1 on base_{0} (c1)".format(n), ())) @@ -214,31 +218,31 @@ class TestConcurrentSchemaChanges(Tester): results = execute_concurrent(session, cmds, raise_on_first_error=True) for (success, result) in results: - self.assertTrue(success, "didn't get success on table create: {}".format(result)) + assert success, "didn't get success on table create: {}".format(result) wait(5) - debug("validating schema and index list") + logger.debug("validating schema and index list") session.cluster.control_connection.wait_for_schema_agreement() session.cluster.refresh_schema_metadata() index_meta = session.cluster.metadata.keyspaces["lots_o_indexes"].indexes self.validate_schema_consistent(node1) self.validate_schema_consistent(node2) - self.assertEqual(10, len(index_meta)) + assert 10 == len(index_meta) for n in range(5): - self.assertIn("ix_base_{0}_c1".format(n), index_meta) - self.assertIn("ix_base_{0}_c2".format(n), index_meta) + assert "ix_base_{0}_c1".format(n) in index_meta + assert "ix_base_{0}_c2".format(n) in index_meta - debug("waiting for indexes to fill in") + logger.debug("waiting for indexes to fill in") wait(45) - debug("querying all values by secondary index") + logger.debug("querying all values by secondary index") for n in range(5): for ins in range(1000): - self.assertEqual(1, len(list(session.execute("select * from base_{0} where c1 = {1}".format(n, ins))))) - self.assertEqual(1, len(list(session.execute("select * from base_{0} where c2 = {1}".format(n, ins))))) + assert 1 == len(list(session.execute("select * from base_{0} where c1 = {1}".format(n, ins)))) + assert 1 == len(list(session.execute("select * from base_{0} where c2 = {1}".format(n, )))) @since('3.0') - def create_lots_of_mv_concurrently_test(self): + def test_create_lots_of_mv_concurrently(self): """ create materialized views across multiple threads concurrently """ @@ -261,15 +265,15 @@ class TestConcurrentSchemaChanges(Tester): "WHERE c{0} IS NOT NULL AND id IS NOT NULL PRIMARY KEY (c{0}, id)".format(n))) session.cluster.control_connection.wait_for_schema_agreement() - debug("waiting for indexes to fill in") + logger.debug("waiting for indexes to fill in") wait(60) result = list(session.execute(("SELECT * FROM system_schema.views " "WHERE keyspace_name='lots_o_views' AND base_table_name='source_data' ALLOW FILTERING"))) - self.assertEqual(10, len(result), "missing some mv from source_data table") + assert 10, len(result) == "missing some mv from source_data table" for n in range(1, 11): result = list(session.execute("select * from src_by_c{0}".format(n))) - self.assertEqual(4000, len(result)) + assert 4000 == len(result) def _do_lots_of_schema_actions(self, session): for n in range(20): @@ -287,7 +291,7 @@ class TestConcurrentSchemaChanges(Tester): results = execute_concurrent(session, cmds, concurrency=100, raise_on_first_error=True) for (success, result) in results: - self.assertTrue(success, "didn't get success: {}".format(result)) + assert success, "didn't get success: {}".format(result) def _verify_lots_of_schema_actions(self, session): session.cluster.control_connection.wait_for_schema_agreement() @@ -302,7 +306,7 @@ class TestConcurrentSchemaChanges(Tester): table_meta = session.cluster.metadata.keyspaces["lots_o_churn"].tables errors = [] for n in range(20): - self.assertTrue("new_table_{0}".format(n) in table_meta) + assert "new_table_{0}".format(n) in table_meta if 7 != len(table_meta["index_me_{0}".format(n)].indexes): errors.append("index_me_{0} expected indexes ix_index_me_c0->7, got: {1}".format(n, sorted(list(table_meta["index_me_{0}".format(n)].indexes)))) @@ -313,9 +317,9 @@ class TestConcurrentSchemaChanges(Tester): if 8 != len(altered.columns): errors.append("alter_me_{0} expected c1 -> c7, id, got: {1}".format(n, sorted(list(altered.columns)))) - self.assertTrue(0 == len(errors), "\n".join(errors)) + assert 0 == len(errors), "\n".join(errors) - def create_lots_of_schema_churn_test(self): + def test_create_lots_of_schema_churn(self): """ create tables, indexes, alters across multiple threads concurrently """ @@ -327,11 +331,11 @@ class TestConcurrentSchemaChanges(Tester): session.execute("use lots_o_churn") self._do_lots_of_schema_actions(session) - debug("waiting for things to settle and sync") + logger.debug("waiting for things to settle and sync") wait(60) self._verify_lots_of_schema_actions(session) - def create_lots_of_schema_churn_with_node_down_test(self): + def test_create_lots_of_schema_churn_with_node_down(self): """ create tables, indexes, alters across multiple threads concurrently with a node down """ @@ -346,15 +350,15 @@ class TestConcurrentSchemaChanges(Tester): self._do_lots_of_schema_actions(session) wait(15) node2.start(wait_other_notice=True) - debug("waiting for things to settle and sync") + logger.debug("waiting for things to settle and sync") wait(120) self._verify_lots_of_schema_actions(session) - def basic_test(self): + def test_basic(self): """ make several schema changes on the same node. """ - debug("basic_test()") + logger.debug("basic_test()") cluster = self.cluster cluster.populate(2).start() @@ -366,8 +370,8 @@ class TestConcurrentSchemaChanges(Tester): self.make_schema_changes(session, namespace='ns1') - def changes_to_different_nodes_test(self): - debug("changes_to_different_nodes_test()") + def test_changes_to_different_nodes(self): + logger.debug("changes_to_different_nodes_test()") cluster = self.cluster cluster.populate(2).start() node1, node2 = cluster.nodelist() @@ -389,13 +393,13 @@ class TestConcurrentSchemaChanges(Tester): # check both, just because we can self.validate_schema_consistent(node2) - def changes_while_node_down_test(self): + def test_changes_while_node_down(self): """ makes schema changes while a node is down. Make schema changes to node 1 while node 2 is down. Then bring up 2 and make sure it gets the changes. """ - debug("changes_while_node_down_test()") + logger.debug("changes_while_node_down_test()") cluster = self.cluster cluster.populate(2).start() node1, node2 = cluster.nodelist() @@ -414,7 +418,7 @@ class TestConcurrentSchemaChanges(Tester): wait(20) self.validate_schema_consistent(node1) - def changes_while_node_toggle_test(self): + def test_changes_while_node_toggle(self): """ makes schema changes while a node is down. @@ -422,7 +426,7 @@ class TestConcurrentSchemaChanges(Tester): Bring down 2, bring up 1, and finally bring up 2. 1 should get the changes. """ - debug("changes_while_node_toggle_test()") + logger.debug("changes_while_node_toggle_test()") cluster = self.cluster cluster.populate(2).start() node1, node2 = cluster.nodelist() @@ -441,8 +445,8 @@ class TestConcurrentSchemaChanges(Tester): wait(20) self.validate_schema_consistent(node1) - def decommission_node_test(self): - debug("decommission_node_test()") + def test_decommission_node(self): + logger.debug("decommission_node_test()") cluster = self.cluster cluster.populate(1) @@ -490,8 +494,8 @@ class TestConcurrentSchemaChanges(Tester): wait(30) self.validate_schema_consistent(node1) - def snapshot_test(self): - debug("snapshot_test()") + def test_snapshot(self): + logger.debug("snapshot_test()") cluster = self.cluster cluster.populate(2).start() node1, node2 = cluster.nodelist() @@ -535,11 +539,11 @@ class TestConcurrentSchemaChanges(Tester): wait(2) self.validate_schema_consistent(node1) - def load_test(self): + def test_load(self): """ apply schema changes while the cluster is under load. """ - debug("load_test()") + logger.debug("load_test()") cluster = self.cluster cluster.populate(1).start() @@ -548,14 +552,14 @@ class TestConcurrentSchemaChanges(Tester): session = self.cql_connection(node1) def stress(args=[]): - debug("Stressing") + logger.debug("Stressing") node1.stress(args) - debug("Done Stressing") + logger.debug("Done Stressing") def compact(): - debug("Compacting...") + logger.debug("Compacting...") node1.nodetool('compact') - debug("Done Compacting.") + logger.debug("Done Compacting.") # put some data into the cluster stress(['write', 'n=30000', 'no-warmup', '-rate', 'threads=8']) http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/configuration_test.py ---------------------------------------------------------------------- diff --git a/configuration_test.py b/configuration_test.py index 2f696eb..6bb5e95 100644 --- a/configuration_test.py +++ b/configuration_test.py @@ -1,16 +1,30 @@ import os - +import logging import parse +import pytest + from cassandra.concurrent import execute_concurrent_with_args -from dtest import Tester, debug, create_ks +from tools.misc import ImmutableMapping +from dtest_setup_overrides import DTestSetupOverrides +from dtest import Tester, create_ks from tools.jmxutils import (JolokiaAgent, make_mbean, remove_perf_disable_shared_mem) +logger = logging.getLogger(__name__) + + [email protected]() +def fixture_dtest_setup_overrides(request): + dtest_setup_overrides = DTestSetupOverrides() + if request.node.name == "test_change_durable_writes": + dtest_setup_overrides.cluster_options = ImmutableMapping({'commitlog_segment_size_in_mb': 1}) + return dtest_setup_overrides + class TestConfiguration(Tester): - def compression_chunk_length_test(self): + def test_compression_chunk_length(self): """ Verify the setting of compression chunk_length [#3558]""" cluster = self.cluster @@ -20,7 +34,9 @@ class TestConfiguration(Tester): create_ks(session, 'ks', 1) create_table_query = "CREATE TABLE test_table (row varchar, name varchar, value int, PRIMARY KEY (row, name));" - alter_chunk_len_query = "ALTER TABLE test_table WITH compression = {{'sstable_compression' : 'SnappyCompressor', 'chunk_length_kb' : {chunk_length}}};" + alter_chunk_len_query = "ALTER TABLE test_table WITH " \ + "compression = {{'sstable_compression' : 'SnappyCompressor', " \ + "'chunk_length_kb' : {chunk_length}}};" session.execute(create_table_query) @@ -30,7 +46,8 @@ class TestConfiguration(Tester): session.execute(alter_chunk_len_query.format(chunk_length=64)) self._check_chunk_length(session, 64) - def change_durable_writes_test(self): + @pytest.mark.timeout(60*30) + def test_change_durable_writes(self): """ @jira_ticket CASSANDRA-9560 @@ -51,15 +68,14 @@ class TestConfiguration(Tester): """ def new_commitlog_cluster_node(): # writes should block on commitlog fsync - self.cluster.populate(1) - node = self.cluster.nodelist()[0] - self.cluster.set_configuration_options(values={'commitlog_segment_size_in_mb': 1}) - self.cluster.set_batch_commitlog(enabled=True) + self.fixture_dtest_setup.cluster.populate(1) + node = self.fixture_dtest_setup.cluster.nodelist()[0] + self.fixture_dtest_setup.cluster.set_batch_commitlog(enabled=True) # disable JVM option so we can use Jolokia - # this has to happen after .set_configuration_options because of implmentation details + # this has to happen after .set_configuration_options because of implementation details remove_perf_disable_shared_mem(node) - self.cluster.start(wait_for_binary_proto=True) + self.fixture_dtest_setup.cluster.start(wait_for_binary_proto=True) return node durable_node = new_commitlog_cluster_node() @@ -70,16 +86,15 @@ class TestConfiguration(Tester): durable_session.execute("CREATE KEYSPACE ks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1} " "AND DURABLE_WRITES = true") durable_session.execute('CREATE TABLE ks.tab (key int PRIMARY KEY, a int, b int, c int)') - debug('commitlog size diff = ' + str(commitlog_size(durable_node) - durable_init_size)) + logger.debug('commitlog size diff = ' + str(commitlog_size(durable_node) - durable_init_size)) write_to_trigger_fsync(durable_session, 'ks', 'tab') - self.assertGreater(commitlog_size(durable_node), durable_init_size, - msg='This test will not work in this environment; ' - 'write_to_trigger_fsync does not trigger fsync.') + assert commitlog_size(durable_node) > durable_init_size, \ + "This test will not work in this environment; write_to_trigger_fsync does not trigger fsync." # get a fresh cluster to work on - self.tearDown() - self.setUp() + durable_session.shutdown() + self.fixture_dtest_setup.cleanup_and_replace_cluster() node = new_commitlog_cluster_node() init_size = commitlog_size(node) @@ -91,8 +106,7 @@ class TestConfiguration(Tester): session.execute('CREATE TABLE ks.tab (key int PRIMARY KEY, a int, b int, c int)') session.execute('ALTER KEYSPACE ks WITH DURABLE_WRITES=true') write_to_trigger_fsync(session, 'ks', 'tab') - self.assertGreater(commitlog_size(node), init_size, - msg='ALTER KEYSPACE was not respected') + assert commitlog_size(node) > init_size, "ALTER KEYSPACE was not respected" def overlapping_data_folders(self): """ @@ -130,12 +144,13 @@ class TestConfiguration(Tester): if 'compression' in result: params = result - self.assertNotEqual(params, '', "Looking for the string 'sstable_compression', but could not find it in {str}".format(str=result)) + assert not params == '', "Looking for the string 'sstable_compression', but could not find " \ + "it in {str}".format(str=result) chunk_string = "chunk_length_kb" if self.cluster.version() < '3.0' else "chunk_length_in_kb" chunk_length = parse.search("'" + chunk_string + "': '{chunk_length:d}'", result).named['chunk_length'] - self.assertEqual(chunk_length, value, "Expected chunk_length: {}. We got: {}".format(value, chunk_length)) + assert chunk_length == value, "Expected chunk_length: {}. We got: {}".format(value, chunk_length) def write_to_trigger_fsync(session, ks, table): @@ -145,9 +160,17 @@ def write_to_trigger_fsync(session, ks, table): commitlog_segment_size_in_mb is 1. Assumes the table's columns are (key int, a int, b int, c int). """ + """ + From https://github.com/datastax/python-driver/pull/877/files + "Note: in the case that `generators` are used, it is important to ensure the consumers do not + block or attempt further synchronous requests, because no further IO will be processed until + the consumer returns. This may also produce a deadlock in the IO event thread." + """ execute_concurrent_with_args(session, - session.prepare('INSERT INTO "{ks}"."{table}" (key, a, b, c) VALUES (?, ?, ?, ?)'.format(ks=ks, table=table)), - ((x, x + 1, x + 2, x + 3) for x in range(50000))) + session.prepare('INSERT INTO "{ks}"."{table}" (key, a, b, c) ' + 'VALUES (?, ?, ?, ?)'.format(ks=ks, table=table)), + ((x, x + 1, x + 2, x + 3) + for x in range(50000)), concurrency=5) def commitlog_size(node): --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
