http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/commitlog_test.py
----------------------------------------------------------------------
diff --git a/commitlog_test.py b/commitlog_test.py
index 99c4caf..c8830cd 100644
--- a/commitlog_test.py
+++ b/commitlog_test.py
@@ -5,6 +5,8 @@ import stat
 import struct
 import time
 from distutils.version import LooseVersion
+import pytest
+import logging
 
 from cassandra import WriteTimeout
 from cassandra.cluster import NoHostAvailable, OperationTimedOut
@@ -12,28 +14,33 @@ from ccmlib.common import is_win
 from ccmlib.node import Node, TimeoutError
 from parse import parse
 
-from dtest import Tester, debug, create_ks
-from tools.assertions import assert_almost_equal, assert_none, assert_one
+from dtest import Tester, create_ks
+from tools.assertions import (assert_almost_equal, assert_none, assert_one, 
assert_lists_equal_ignoring_order)
 from tools.data import rows_to_list
-from tools.decorators import since
+
+since = pytest.mark.since
+logger = logging.getLogger(__name__)
 
 
 class TestCommitLog(Tester):
     """
     CommitLog Tests
     """
-    allow_log_errors = True
+    @pytest.fixture(autouse=True)
+    def fixture_add_additional_log_patterns(self, fixture_dtest_setup):
+        fixture_dtest_setup.allow_log_errors = True
+
+    @pytest.fixture(scope='function', autouse=True)
+    def fixture_set_cluster_settings(self, fixture_dtest_setup):
+        fixture_dtest_setup.cluster.populate(1)
+        [self.node1] = fixture_dtest_setup.cluster.nodelist()
 
-    def setUp(self):
-        super(TestCommitLog, self).setUp()
-        self.cluster.populate(1)
-        [self.node1] = self.cluster.nodelist()
+        yield
 
-    def tearDown(self):
         # Some of the tests change commitlog permissions to provoke failure
         # so this changes them back so we can delete them.
         self._change_commitlog_perms(stat.S_IWRITE | stat.S_IREAD | 
stat.S_IEXEC)
-        super(TestCommitLog, self).tearDown()
+
 
     def prepare(self, configuration=None, create_test_keyspace=True, **kwargs):
         if configuration is None:
@@ -41,7 +48,7 @@ class TestCommitLog(Tester):
         default_conf = {'commitlog_sync_period_in_ms': 1000}
 
         set_conf = dict(default_conf, **configuration)
-        debug('setting commitlog configuration with the following values: '
+        logger.debug('setting commitlog configuration with the following 
values: '
               '{set_conf} and the following kwargs: {kwargs}'.format(
                   set_conf=set_conf, kwargs=kwargs))
         self.cluster.set_configuration_options(values=set_conf, **kwargs)
@@ -61,15 +68,15 @@ class TestCommitLog(Tester):
 
     def _change_commitlog_perms(self, mod):
         for path in self._get_commitlog_paths():
-            debug('changing permissions to {perms} on 
{path}'.format(perms=oct(mod), path=path))
+            logger.debug('changing permissions to {perms} on 
{path}'.format(perms=oct(mod), path=path))
             os.chmod(path, mod)
             commitlogs = glob.glob(path + '/*')
 
             if commitlogs:
-                debug('changing permissions to {perms} on the following files:'
+                logger.debug('changing permissions to {perms} on the following 
files:'
                       '\n  {files}'.format(perms=oct(mod), files='\n  
'.join(commitlogs)))
             else:
-                debug(self._change_commitlog_perms.__name__ + ' called on 
empty commitlog directory '
+                logger.debug(self._change_commitlog_perms.__name__ + ' called 
on empty commitlog directory '
                       '{path} with permissions {perms}'.format(path=path, 
perms=oct(mod)))
 
             for commitlog in commitlogs:
@@ -108,7 +115,7 @@ class TestCommitLog(Tester):
         time.sleep(1)
 
         commitlogs = self._get_commitlog_files()
-        self.assertGreater(len(commitlogs), 0, 'No commit log files were 
created')
+        assert len(commitlogs) > 0, 'No commit log files were created'
 
         # the most recently-written segment of the commitlog may be smaller
         # than the expected size, so we allow exactly one segment to be smaller
@@ -116,23 +123,23 @@ class TestCommitLog(Tester):
         for i, f in enumerate(commitlogs):
             size = os.path.getsize(f)
             size_in_mb = int(size / 1024 / 1024)
-            debug('segment file {} {}; smaller already found: {}'.format(f, 
size_in_mb, smaller_found))
+            logger.debug('segment file {} {}; smaller already found: 
{}'.format(f, size_in_mb, smaller_found))
             if size_in_mb < 1 or size < (segment_size * 0.1):
-                debug('segment file not yet used; moving to next file')
+                logger.debug('segment file not yet used; moving to next file')
                 continue  # commitlog not yet used
 
             try:
                 if compressed:
                     # if compression is used, we assume there will be at most 
a 50% compression ratio
-                    self.assertLess(size, segment_size)
-                    self.assertGreater(size, segment_size / 2)
+                    assert size < segment_size
+                    assert size > segment_size / 2
                 else:
                     # if no compression is used, the size will be close to 
what we expect
                     assert_almost_equal(size, segment_size, error=0.05)
             except AssertionError as e:
                 # the last segment may be smaller
                 if not smaller_found:
-                    self.assertLessEqual(size, segment_size)
+                    assert size <= segment_size
                     smaller_found = True
                 else:
                     raise e
@@ -141,7 +148,7 @@ class TestCommitLog(Tester):
         """
         Provoke the commitlog failure
         """
-        debug('Provoking commitlog failure')
+        logger.debug('Provoking commitlog failure')
         # Test things are ok at this point
         self.session1.execute("""
             INSERT INTO test (key, col1) VALUES (1, 1);
@@ -164,17 +171,16 @@ class TestCommitLog(Tester):
         replay due to MV lock contention.  Fixed in 3.0.7 and 3.7.
         @jira_ticket CASSANDRA-11891
         """
-
         cluster_ver = self.cluster.version()
         if LooseVersion('3.1') <= cluster_ver < LooseVersion('3.7'):
-            self.skipTest("Fixed in 3.0.7 and 3.7")
+            pytest.skip("Fixed in 3.0.7 and 3.7")
 
         node1 = self.node1
         node1.set_batch_commitlog(enabled=True)
         node1.start()
         session = self.patient_cql_connection(node1)
 
-        debug("Creating schema")
+        logger.debug("Creating schema")
         create_ks(session, 'Test', 1)
         session.execute("""
             CREATE TABLE mytable (
@@ -192,37 +198,37 @@ class TestCommitLog(Tester):
             PRIMARY KEY (a, b);
         """)
 
-        debug("Insert data")
+        logger.debug("Insert data")
         num_rows = 1024  # maximum number of mutations replayed at once by the 
commit log
-        for i in xrange(num_rows):
+        for i in range(num_rows):
             session.execute("INSERT INTO Test.mytable (a, b, c) VALUES (0, 
{i}, {i})".format(i=i))
 
         node1.stop(gently=False)
         node1.mark_log_for_errors()
 
-        debug("Verify commitlog was written before abrupt stop")
+        logger.debug("Verify commitlog was written before abrupt stop")
         commitlog_files = os.listdir(os.path.join(node1.get_path(), 
'commitlogs'))
-        self.assertNotEqual([], commitlog_files)
+        assert [] != commitlog_files
 
         # set a short timeout to ensure lock contention will generally exceed 
this
         node1.set_configuration_options({'write_request_timeout_in_ms': 30})
-        debug("Starting node again")
+        logger.debug("Starting node again")
         node1.start()
 
-        debug("Verify commit log was replayed on startup")
+        logger.debug("Verify commit log was replayed on startup")
         start_time, replay_complete = time.time(), False
         while not replay_complete:
             matches = node1.grep_log(r".*WriteTimeoutException.*")
-            self.assertEqual([], matches)
+            assert [] == matches
 
             replay_complete = node1.grep_log("Log replay complete")
-            self.assertLess(time.time() - start_time, 120, "Did not finish 
commitlog replay within 120 seconds")
+            assert time.time() - start_time < 120, "Did not finish commitlog 
replay within 120 seconds"
 
-        debug("Reconnecting to node")
+        logger.debug("Reconnecting to node")
         session = self.patient_cql_connection(node1)
-        debug("Make query to ensure data is present")
+        logger.debug("Make query to ensure data is present")
         res = list(session.execute("SELECT * FROM Test.mytable"))
-        self.assertEqual(num_rows, len(res), res)
+        assert num_rows == len(res), res
 
     def test_commitlog_replay_on_startup(self):
         """
@@ -232,7 +238,7 @@ class TestCommitLog(Tester):
         node1.set_batch_commitlog(enabled=True)
         node1.start()
 
-        debug("Insert data")
+        logger.debug("Insert data")
         session = self.patient_cql_connection(node1)
         create_ks(session, 'Test', 1)
         session.execute("""
@@ -247,69 +253,67 @@ class TestCommitLog(Tester):
         session.execute("INSERT INTO Test. users (user_name, password, gender, 
state, birth_year) "
                         "VALUES('gandalf', 'p@$$', 'male', 'WA', 1955);")
 
-        debug("Verify data is present")
+        logger.debug("Verify data is present")
         session = self.patient_cql_connection(node1)
         res = session.execute("SELECT * FROM Test. users")
-        self.assertItemsEqual(rows_to_list(res),
-                              [[u'gandalf', 1955, u'male', u'p@$$', u'WA']])
+        assert rows_to_list(res) == [['gandalf', 1955, 'male', 'p@$$', 'WA']]
 
-        debug("Stop node abruptly")
+        logger.debug("Stop node abruptly")
         node1.stop(gently=False)
 
-        debug("Verify commitlog was written before abrupt stop")
+        logger.debug("Verify commitlog was written before abrupt stop")
         commitlog_dir = os.path.join(node1.get_path(), 'commitlogs')
         commitlog_files = os.listdir(commitlog_dir)
-        self.assertTrue(len(commitlog_files) > 0)
+        assert len(commitlog_files) > 0
 
-        debug("Verify no SSTables were flushed before abrupt stop")
-        self.assertEqual(0, len(node1.get_sstables('test', 'users')))
+        logger.debug("Verify no SSTables were flushed before abrupt stop")
+        assert 0 == len(node1.get_sstables('test', 'users'))
 
-        debug("Verify commit log was replayed on startup")
+        logger.debug("Verify commit log was replayed on startup")
         node1.start()
         node1.watch_log_for("Log replay complete")
         # Here we verify from the logs that some mutations were replayed
         replays = [match_tuple[0] for match_tuple in node1.grep_log(" \d+ 
replayed mutations")]
-        debug('The following log lines indicate that mutations were replayed: 
{msgs}'.format(msgs=replays))
+        logger.debug('The following log lines indicate that mutations were 
replayed: {msgs}'.format(msgs=replays))
         num_replayed_mutations = [
             parse('{} {num_mutations:d} replayed mutations{}', 
line).named['num_mutations']
             for line in replays
         ]
         # assert there were some lines where more than zero mutations were 
replayed
-        self.assertNotEqual([m for m in num_replayed_mutations if m > 0], [])
+        assert [m for m in num_replayed_mutations if m > 0] != []
 
-        debug("Make query and ensure data is present")
+        logger.debug("Make query and ensure data is present")
         session = self.patient_cql_connection(node1)
         res = session.execute("SELECT * FROM Test. users")
-        self.assertItemsEqual(rows_to_list(res),
-                              [[u'gandalf', 1955, u'male', u'p@$$', u'WA']])
+        assert_lists_equal_ignoring_order(rows_to_list(res), [['gandalf', 
1955, 'male', 'p@$$', 'WA']])
 
-    def default_segment_size_test(self):
+    def test_default_segment_size(self):
         """
         Test default commitlog_segment_size_in_mb (32MB)
         """
         self._segment_size_test(32)
 
-    def small_segment_size_test(self):
+    def test_small_segment_size(self):
         """
         Test a small commitlog_segment_size_in_mb (5MB)
         """
         self._segment_size_test(5)
 
     @since('2.2')
-    def default_compressed_segment_size_test(self):
+    def test_default_compressed_segment_size(self):
         """
         Test default compressed commitlog_segment_size_in_mb (32MB)
         """
         self._segment_size_test(32, compressed=True)
 
     @since('2.2')
-    def small_compressed_segment_size_test(self):
+    def test_small_compressed_segment_size(self):
         """
         Test a small compressed commitlog_segment_size_in_mb (5MB)
         """
         self._segment_size_test(5, compressed=True)
 
-    def stop_failure_policy_test(self):
+    def test_stop_failure_policy(self):
         """
         Test the stop commitlog failure policy (default one)
         """
@@ -317,23 +321,23 @@ class TestCommitLog(Tester):
 
         self._provoke_commitlog_failure()
         failure = self.node1.grep_log("Failed .+ commit log segments. Commit 
disk failure policy is stop; terminating thread")
-        debug(failure)
-        self.assertTrue(failure, "Cannot find the commitlog failure message in 
logs")
-        self.assertTrue(self.node1.is_running(), "Node1 should still be 
running")
+        logger.debug(failure)
+        assert failure, "Cannot find the commitlog failure message in logs"
+        assert self.node1.is_running(), "Node1 should still be running"
 
         # Cannot write anymore after the failure
-        with self.assertRaises(NoHostAvailable):
+        with pytest.raises(NoHostAvailable):
             self.session1.execute("""
               INSERT INTO test (key, col1) VALUES (2, 2);
             """)
 
         # Should not be able to read neither
-        with self.assertRaises(NoHostAvailable):
+        with pytest.raises(NoHostAvailable):
             self.session1.execute("""
               "SELECT * FROM test;"
             """)
 
-    def stop_commit_failure_policy_test(self):
+    def test_stop_commit_failure_policy(self):
         """
         Test the stop_commit commitlog failure policy
         """
@@ -347,26 +351,26 @@ class TestCommitLog(Tester):
 
         self._provoke_commitlog_failure()
         failure = self.node1.grep_log("Failed .+ commit log segments. Commit 
disk failure policy is stop_commit; terminating thread")
-        debug(failure)
-        self.assertTrue(failure, "Cannot find the commitlog failure message in 
logs")
-        self.assertTrue(self.node1.is_running(), "Node1 should still be 
running")
+        logger.debug(failure)
+        assert failure, "Cannot find the commitlog failure message in logs"
+        assert self.node1.is_running(), "Node1 should still be running"
 
         # Cannot write anymore after the failure
-        debug('attempting to insert to node with failing commitlog; should 
fail')
-        with self.assertRaises((OperationTimedOut, WriteTimeout)):
+        logger.debug('attempting to insert to node with failing commitlog; 
should fail')
+        with pytest.raises((OperationTimedOut, WriteTimeout)):
             self.session1.execute("""
               INSERT INTO test (key, col1) VALUES (2, 2);
             """)
 
         # Should be able to read
-        debug('attempting to read from node with failing commitlog; should 
succeed')
+        logger.debug('attempting to read from node with failing commitlog; 
should succeed')
         assert_one(
             self.session1,
             "SELECT * FROM test where key=2;",
             [2, 2]
         )
 
-    def die_failure_policy_test(self):
+    def test_die_failure_policy(self):
         """
         Test the die commitlog failure policy
         """
@@ -376,11 +380,11 @@ class TestCommitLog(Tester):
 
         self._provoke_commitlog_failure()
         failure = self.node1.grep_log("ERROR \[COMMIT-LOG-ALLOCATOR\].+JVM 
state determined to be unstable.  Exiting forcefully")
-        debug(failure)
-        self.assertTrue(failure, "Cannot find the commitlog failure message in 
logs")
-        self.assertFalse(self.node1.is_running(), "Node1 should not be 
running")
+        logger.debug(failure)
+        assert failure, "Cannot find the commitlog failure message in logs"
+        assert not self.node1.is_running(), "Node1 should not be running"
 
-    def ignore_failure_policy_test(self):
+    def test_ignore_failure_policy(self):
         """
         Test the ignore commitlog failure policy
         """
@@ -390,8 +394,8 @@ class TestCommitLog(Tester):
 
         self._provoke_commitlog_failure()
         failure = self.node1.grep_log("ERROR \[COMMIT-LOG-ALLOCATOR\].+Failed 
.+ commit log segments")
-        self.assertTrue(failure, "Cannot find the commitlog failure message in 
logs")
-        self.assertTrue(self.node1.is_running(), "Node1 should still be 
running")
+        assert failure, "Cannot find the commitlog failure message in logs"
+        assert self.node1.is_running(), "Node1 should still be running"
 
         # on Windows, we can't delete the segments if they're chmod to 0 so 
they'll still be available for use by CLSM,
         # and we can still create new segments since os.chmod is limited to 
stat.S_IWRITE and stat.S_IREAD to set files
@@ -401,10 +405,10 @@ class TestCommitLog(Tester):
         if is_win():
             # We expect this to succeed
             self.session1.execute(query)
-            self.assertFalse(self.node1.grep_log("terminating thread"), 
"thread was terminated but CL error should have been ignored.")
-            self.assertTrue(self.node1.is_running(), "Node1 should still be 
running after an ignore error on CL")
+            assert not self.node1.grep_log("terminating thread"), "thread was 
terminated but CL error should have been ignored."
+            assert self.node1.is_running(), "Node1 should still be running 
after an ignore error on CL"
         else:
-            with self.assertRaises((OperationTimedOut, WriteTimeout)):
+            with pytest.raises((OperationTimedOut, WriteTimeout)):
                 self.session1.execute(query)
 
             # Should not exist
@@ -436,13 +440,11 @@ class TestCommitLog(Tester):
         and the commit_failure_policy is stop, C* shouldn't startup
         @jira_ticket CASSANDRA-9749
         """
-        if not hasattr(self, 'ignore_log_patterns'):
-            self.ignore_log_patterns = []
-
         expected_error = "Exiting due to error while processing commit log 
during initialization."
-        self.ignore_log_patterns.append(expected_error)
+        self.fixture_dtest_setup.ignore_log_patterns = 
list(self.fixture_dtest_setup.ignore_log_patterns) + [
+            expected_error]
         node = self.node1
-        self.assertIsInstance(node, Node)
+        assert isinstance(node, Node)
         node.set_configuration_options({'commit_failure_policy': 'stop', 
'commitlog_sync_period_in_ms': 1000})
         self.cluster.start()
 
@@ -454,7 +456,7 @@ class TestCommitLog(Tester):
             cursor.execute("INSERT INTO ks.tbl (k, v) VALUES ({0}, 
{0})".format(i))
 
         results = list(cursor.execute("SELECT * FROM ks.tbl"))
-        self.assertEqual(len(results), 10)
+        assert len(results) == 10
 
         # with the commitlog_sync_period_in_ms set to 1000,
         # this sleep guarantees that the commitlog data is
@@ -469,14 +471,14 @@ class TestCommitLog(Tester):
             ks_dir = os.path.join(data_dir, 'ks')
             db_dir = os.listdir(ks_dir)[0]
             sstables = len([f for f in os.listdir(os.path.join(ks_dir, 
db_dir)) if f.endswith('.db')])
-            self.assertEqual(sstables, 0)
+            assert sstables == 0
 
         # modify the commit log crc values
         cl_dir = os.path.join(path, 'commitlogs')
-        self.assertTrue(len(os.listdir(cl_dir)) > 0)
+        assert len(os.listdir(cl_dir)) > 0
         for cl in os.listdir(cl_dir):
             # locate the CRC location
-            with open(os.path.join(cl_dir, cl), 'r') as f:
+            with open(os.path.join(cl_dir, cl), 'rb') as f:
                 f.seek(0)
                 version = struct.unpack('>i', f.read(4))[0]
                 crc_pos = 12
@@ -486,22 +488,22 @@ class TestCommitLog(Tester):
                     crc_pos += 2 + psize
 
             # rewrite it with crap
-            with open(os.path.join(cl_dir, cl), 'w') as f:
+            with open(os.path.join(cl_dir, cl), 'wb') as f:
                 f.seek(crc_pos)
                 f.write(struct.pack('>i', 123456))
 
             # verify said crap
-            with open(os.path.join(cl_dir, cl), 'r') as f:
+            with open(os.path.join(cl_dir, cl), 'rb') as f:
                 f.seek(crc_pos)
                 crc = struct.unpack('>i', f.read(4))[0]
-                self.assertEqual(crc, 123456)
+                assert crc == 123456
 
         mark = node.mark_log()
         node.start()
         node.watch_log_for(expected_error, from_mark=mark)
-        with self.assertRaises(TimeoutError):
+        with pytest.raises(TimeoutError):
             node.wait_for_binary_interface(from_mark=mark, timeout=20)
-        self.assertFalse(node.is_running())
+        assert not node.is_running()
 
     @since('2.2')
     def test_compression_error(self):
@@ -510,13 +512,11 @@ class TestCommitLog(Tester):
         if the commit log header refers to an unknown compression class, and
         the commit_failure_policy is stop, C* shouldn't start up
         """
-        if not hasattr(self, 'ignore_log_patterns'):
-            self.ignore_log_patterns = []
-
         expected_error = 'Could not create Compression for type 
org.apache.cassandra.io.compress.LZ5Compressor'
-        self.ignore_log_patterns.append(expected_error)
+        self.fixture_dtest_setup.ignore_log_patterns = 
list(self.fixture_dtest_setup.ignore_log_patterns) + [
+            expected_error]
         node = self.node1
-        self.assertIsInstance(node, Node)
+        assert isinstance(node, Node)
         node.set_configuration_options({'commit_failure_policy': 'stop',
                                         'commitlog_compression': 
[{'class_name': 'LZ4Compressor'}],
                                         'commitlog_sync_period_in_ms': 1000})
@@ -530,7 +530,7 @@ class TestCommitLog(Tester):
             cursor.execute("INSERT INTO ks1.tbl (k, v) VALUES ({0}, 
{0})".format(i))
 
         results = list(cursor.execute("SELECT * FROM ks1.tbl"))
-        self.assertEqual(len(results), 10)
+        assert len(results) == 10
 
         # with the commitlog_sync_period_in_ms set to 1000,
         # this sleep guarantees that the commitlog data is
@@ -545,31 +545,37 @@ class TestCommitLog(Tester):
             ks_dir = os.path.join(data_dir, 'ks1')
             db_dir = os.listdir(ks_dir)[0]
             sstables = sstables + len([f for f in 
os.listdir(os.path.join(ks_dir, db_dir)) if f.endswith('.db')])
-        self.assertEqual(sstables, 0)
+        assert sstables == 0
 
         def get_header_crc(header):
             """
             When calculating the header crc, C* splits up the 8b id, first 
adding the 4 least significant
             bytes to the crc, then the 5 most significant bytes, so this 
splits them and calculates the same way
             """
-            new_header = header[:4]
+            new_header = bytearray(header[:4])
             # C* evaluates most and least significant 4 bytes out of order
-            new_header += header[8:12]
-            new_header += header[4:8]
+            new_header.extend(header[8:12])
+            new_header.extend(header[4:8])
             # C* evaluates the short parameter length as an int
-            new_header += '\x00\x00' + header[12:14]  # the
-            new_header += header[14:]
-            return binascii.crc32(new_header)
+            new_header.extend(b'\x00\x00')
+            new_header.extend(header[12:14])  # the
+            new_header.extend(header[14:])
+
+            # https://docs.python.org/2/library/binascii.html
+            # "Changed in version 2.6: The return value is in the range 
[-2**31, 2**31-1] regardless
+            # of platform. In the past the value would be signed on some 
platforms and unsigned on
+            # others. Use & 0xffffffff on the value if you want it to match 
Python 3 behavior."
+            return binascii.crc32(new_header) & 0xffffffff
 
         # modify the compression parameters to look for a compressor that 
isn't there
         # while this scenario is pretty unlikely, if a jar or lib got moved or 
something,
         # you'd have a similar situation, which would be fixable by the user
         path = node.get_path()
         cl_dir = os.path.join(path, 'commitlogs')
-        self.assertTrue(len(os.listdir(cl_dir)) > 0)
+        assert len(os.listdir(cl_dir)) > 0
         for cl in os.listdir(cl_dir):
             # read the header and find the crc location
-            with open(os.path.join(cl_dir, cl), 'r') as f:
+            with open(os.path.join(cl_dir, cl), 'rb') as f:
                 f.seek(0)
                 crc_pos = 12
                 f.seek(crc_pos)
@@ -583,29 +589,39 @@ class TestCommitLog(Tester):
                 # check that we're going this right
                 f.seek(0)
                 header_bytes = f.read(header_length)
-                self.assertEqual(get_header_crc(header_bytes), crc)
+
+                # https://docs.python.org/2/library/binascii.html
+                # "Changed in version 2.6: The return value is in the range 
[-2**31, 2**31-1] regardless
+                # of platform. In the past the value would be signed on some 
platforms and unsigned on
+                # others. Use & 0xffffffff on the value if you want it to 
match Python 3 behavior."
+                assert get_header_crc(header_bytes) == (crc & 0xffffffff)
 
             # rewrite it with imaginary compressor
-            self.assertIn('LZ4Compressor', header_bytes)
-            header_bytes = header_bytes.replace('LZ4Compressor', 
'LZ5Compressor')
-            self.assertNotIn('LZ4Compressor', header_bytes)
-            self.assertIn('LZ5Compressor', header_bytes)
-            with open(os.path.join(cl_dir, cl), 'w') as f:
+            assert 'LZ4Compressor'.encode("ascii") in header_bytes
+            header_bytes = 
header_bytes.replace('LZ4Compressor'.encode("ascii"), 
'LZ5Compressor'.encode("ascii"))
+            assert 'LZ4Compressor'.encode("ascii") not in header_bytes
+            assert 'LZ5Compressor'.encode("ascii") in header_bytes
+            with open(os.path.join(cl_dir, cl), 'wb') as f:
                 f.seek(0)
                 f.write(header_bytes)
                 f.seek(crc_pos)
-                f.write(struct.pack('>i', get_header_crc(header_bytes)))
+                f.write(struct.pack('>I', get_header_crc(header_bytes)))
 
             # verify we wrote everything correctly
-            with open(os.path.join(cl_dir, cl), 'r') as f:
+            with open(os.path.join(cl_dir, cl), 'rb') as f:
                 f.seek(0)
-                self.assertEqual(f.read(header_length), header_bytes)
+                assert f.read(header_length) == header_bytes
                 f.seek(crc_pos)
                 crc = struct.unpack('>i', f.read(4))[0]
-                self.assertEqual(crc, get_header_crc(header_bytes))
+
+                # https://docs.python.org/2/library/binascii.html
+                # "Changed in version 2.6: The return value is in the range 
[-2**31, 2**31-1] regardless
+                # of platform. In the past the value would be signed on some 
platforms and unsigned on
+                # others. Use & 0xffffffff on the value if you want it to 
match Python 3 behavior."
+                assert (crc & 0xffffffff)  == get_header_crc(header_bytes)
 
         mark = node.mark_log()
         node.start()
         node.watch_log_for(expected_error, from_mark=mark)
-        with self.assertRaises(TimeoutError):
+        with pytest.raises(TimeoutError):
             node.wait_for_binary_interface(from_mark=mark, timeout=20)

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/compaction_test.py
----------------------------------------------------------------------
diff --git a/compaction_test.py b/compaction_test.py
index 999f83c..49a2923 100644
--- a/compaction_test.py
+++ b/compaction_test.py
@@ -5,25 +5,29 @@ import string
 import tempfile
 import time
 from distutils.version import LooseVersion
-
+import pytest
 import parse
+import logging
 
-from dtest import Tester, debug, create_ks
+from dtest import Tester, create_ks
 from tools.assertions import assert_length_equal, assert_none, assert_one
-from tools.decorators import since
 
+since = pytest.mark.since
+logger = logging.getLogger(__name__)
 
-class TestCompaction(Tester):
+strategies = ['LeveledCompactionStrategy', 'SizeTieredCompactionStrategy', 
'DateTieredCompactionStrategy']
 
-    __test__ = False
 
-    def setUp(self):
-        Tester.setUp(self)
+class TestCompaction(Tester):
+
+    @pytest.fixture(scope='function', autouse=True)
+    def fixture_set_cluster_log_level(self, fixture_dtest_setup):
         # compaction test for version 2.2.2 and above relies on DEBUG log in 
debug.log
-        self.cluster.set_log_level("DEBUG")
+        fixture_dtest_setup.cluster.set_log_level("DEBUG")
 
-    @since('0', '2.2.X')
-    def compaction_delete_test(self):
+    @pytest.mark.parametrize("strategy", strategies)
+    @since('0', max_version='2.2.X')
+    def test_compaction_delete(self, strategy):
         """
         Test that executing a delete properly tombstones a row.
         Insert data, delete a partition of data and check that the requesite 
rows are tombstoned.
@@ -35,7 +39,7 @@ class TestCompaction(Tester):
         session = self.patient_cql_connection(node1)
         create_ks(session, 'ks', 1)
 
-        session.execute("create table ks.cf (key int PRIMARY KEY, val int) 
with compaction = {'class':'" + self.strategy + "'} and gc_grace_seconds = 30;")
+        session.execute("create table ks.cf (key int PRIMARY KEY, val int) 
with compaction = {'class':'" + strategy + "'} and gc_grace_seconds = 30;")
 
         for x in range(0, 100):
             session.execute('insert into cf (key, val) values (' + str(x) + 
',1)')
@@ -58,9 +62,9 @@ class TestCompaction(Tester):
 
         numfound = jsoninfo.count("markedForDeleteAt")
 
-        self.assertEqual(numfound, 10)
+        assert numfound == 10
 
-    def data_size_test(self):
+    def test_data_size(self):
         """
         Ensure that data size does not have unwarranted increases after 
compaction.
         Insert data and check data size before and after a compaction.
@@ -80,8 +84,8 @@ class TestCompaction(Tester):
             output = output[output.find("Space used (live)"):]
             initialValue = int(output[output.find(":") + 
1:output.find("\n")].strip())
         else:
-            debug("datasize not found")
-            debug(output)
+            logger.debug("datasize not found")
+            logger.debug(output)
 
         node1.flush()
         node1.compact()
@@ -93,31 +97,32 @@ class TestCompaction(Tester):
             output = output[output.find("Space used (live)"):]
             finalValue = int(output[output.find(":") + 
1:output.find("\n")].strip())
         else:
-            debug("datasize not found")
+            logger.debug("datasize not found")
         # allow 5% size increase - if we have few sstables it is not 
impossible that live size increases *slightly* after compaction
-        self.assertLess(finalValue, initialValue * 1.05)
+        assert finalValue < initialValue * 1.05
 
-    def bloomfilter_size_test(self):
+    @pytest.mark.parametrize("strategy", strategies)
+    def test_bloomfilter_size(self, strategy):
         """
         @jira_ticket CASSANDRA-11344
         Check that bloom filter size is between 50KB and 100KB for 100K keys
         """
-        if not hasattr(self, 'strategy') or self.strategy == 
"LeveledCompactionStrategy":
+        if not hasattr(self, 'strategy') or strategy == 
"LeveledCompactionStrategy":
             strategy_string = 
'strategy=LeveledCompactionStrategy,sstable_size_in_mb=1'
             min_bf_size = 40000
             max_bf_size = 100000
         else:
-            if self.strategy == "DateTieredCompactionStrategy":
+            if strategy == "DateTieredCompactionStrategy":
                 strategy_string = 
"strategy=DateTieredCompactionStrategy,base_time_seconds=86400"  # we want a 
single sstable, so make sure we don't have a tiny first window
             else:
-                strategy_string = "strategy={}".format(self.strategy)
+                strategy_string = "strategy={}".format(strategy)
             min_bf_size = 100000
             max_bf_size = 150000
         cluster = self.cluster
         cluster.populate(1).start(wait_for_binary_proto=True)
         [node1] = cluster.nodelist()
 
-        for x in xrange(0, 5):
+        for x in range(0, 5):
             node1.stress(['write', 'n=100K', "no-warmup", "cl=ONE", "-rate",
                           "threads=300", "-schema", "replication(factor=1)",
                           
"compaction({},enabled=false)".format(strategy_string)])
@@ -134,36 +139,37 @@ class TestCompaction(Tester):
 
         # in some rare cases we can end up with more than one sstable per data 
directory with
         # non-lcs strategies (see CASSANDRA-12323)
-        if not hasattr(self, 'strategy') or self.strategy == 
"LeveledCompactionStrategy":
+        if not hasattr(self, 'strategy') or strategy == 
"LeveledCompactionStrategy":
             size_factor = 1
         else:
             sstable_count = len(node1.get_sstables('keyspace1', 'standard1'))
             dir_count = len(node1.data_directories())
-            debug("sstable_count is: {}".format(sstable_count))
-            debug("dir_count is: {}".format(dir_count))
+            logger.debug("sstable_count is: {}".format(sstable_count))
+            logger.debug("dir_count is: {}".format(dir_count))
             if node1.get_cassandra_version() < LooseVersion('3.2'):
                 size_factor = sstable_count
             else:
                 size_factor = sstable_count / float(dir_count)
 
-        debug("bloom filter size is: {}".format(bfSize))
-        debug("size factor = {}".format(size_factor))
-        self.assertGreaterEqual(bfSize, size_factor * min_bf_size)
-        self.assertLessEqual(bfSize, size_factor * max_bf_size)
+        logger.debug("bloom filter size is: {}".format(bfSize))
+        logger.debug("size factor = {}".format(size_factor))
+        assert bfSize >= size_factor * min_bf_size
+        assert bfSize <= size_factor * max_bf_size
 
-    def sstable_deletion_test(self):
+    @pytest.mark.parametrize("strategy", strategies)
+    def test_sstable_deletion(self, strategy):
         """
         Test that sstables are deleted properly when able after compaction.
         Insert data setting gc_grace_seconds to 0, and determine sstable
         is deleted upon data deletion.
         """
-        self.skip_if_no_major_compaction()
+        self.skip_if_no_major_compaction(strategy)
         cluster = self.cluster
         cluster.populate(1).start(wait_for_binary_proto=True)
         [node1] = cluster.nodelist()
         session = self.patient_cql_connection(node1)
         create_ks(session, 'ks', 1)
-        session.execute("create table cf (key int PRIMARY KEY, val int) with 
gc_grace_seconds = 0 and compaction= {'class':'" + self.strategy + "'}")
+        session.execute("create table cf (key int PRIMARY KEY, val int) with 
gc_grace_seconds = 0 and compaction= {'class':'" + strategy + "'}")
 
         for x in range(0, 100):
             session.execute('insert into cf (key, val) values (' + str(x) + 
',1)')
@@ -180,22 +186,21 @@ class TestCompaction(Tester):
                 cfs = os.listdir(os.path.join(data_dir, "ks"))
                 ssdir = os.listdir(os.path.join(data_dir, "ks", cfs[0]))
                 for afile in ssdir:
-                    self.assertFalse("Data" in afile, afile)
+                    assert not "Data" in afile, afile
 
         except OSError:
             self.fail("Path to sstables not valid.")
 
-    def dtcs_deletion_test(self):
+    @pytest.mark.parametrize("strategy", ['DateTieredCompactionStrategy'])
+    def test_dtcs_deletion(self, strategy):
         """
         Test that sstables are deleted properly when able after compaction with
         DateTieredCompactionStrategy.
         Insert data setting max_sstable_age_days low, and determine sstable
         is deleted upon data deletion past max_sstable_age_days.
         """
-        if not hasattr(self, 'strategy'):
-            self.strategy = 'DateTieredCompactionStrategy'
-        elif self.strategy != 'DateTieredCompactionStrategy':
-            self.skipTest('Not implemented unless DateTieredCompactionStrategy 
is used')
+        if strategy != 'DateTieredCompactionStrategy':
+            pytest.skip('Not implemented unless DateTieredCompactionStrategy 
is used')
 
         cluster = self.cluster
         cluster.populate(1).start(wait_for_binary_proto=True)
@@ -215,7 +220,7 @@ class TestCompaction(Tester):
         expected_sstable_count = 1
         if self.cluster.version() > LooseVersion('3.1'):
             expected_sstable_count = cluster.data_dir_count
-        self.assertEqual(len(expired_sstables), expected_sstable_count)
+        assert len(expired_sstables) == expected_sstable_count
         # write a new sstable to make DTCS check for expired sstables:
         for x in range(0, 100):
             session.execute('insert into cf (key, val) values ({}, 
{})'.format(x, x))
@@ -223,7 +228,7 @@ class TestCompaction(Tester):
         time.sleep(5)
         # we only check every 10 minutes - sstable should still be there:
         for expired_sstable in expired_sstables:
-            self.assertIn(expired_sstable, node1.get_sstables('ks', 'cf'))
+            assert expired_sstable, node1.get_sstables('ks' in 'cf')
 
         session.execute("alter table cf with compaction =  
{'class':'DateTieredCompactionStrategy', 'max_sstable_age_days':0.00035, 
'min_threshold':2, 'expired_sstable_check_frequency_seconds':0}")
         time.sleep(1)
@@ -232,9 +237,9 @@ class TestCompaction(Tester):
         node1.flush()
         time.sleep(5)
         for expired_sstable in expired_sstables:
-            self.assertNotIn(expired_sstable, node1.get_sstables('ks', 'cf'))
+            assert expired_sstable, node1.get_sstables('ks' not in 'cf')
 
-    def compaction_throughput_test(self):
+    def test_compaction_throughput(self):
         """
         Test setting compaction throughput.
         Set throughput, insert data and ensure compaction performance 
corresponds.
@@ -277,24 +282,26 @@ class TestCompaction(Tester):
         }
 
         units = ['MB'] if cluster.version() < LooseVersion('3.6') else ['KiB', 
'MiB', 'GiB']
-        self.assertIn(found_units, units)
+        assert found_units in units
 
-        debug(avgthroughput)
+        logger.debug(avgthroughput)
         avgthroughput_mb = unit_conversion_dct[found_units] * 
float(avgthroughput)
 
         # The throughput in the log is computed independantly from the 
throttling and on the output files while
         # throttling is on the input files, so while that throughput shouldn't 
be higher than the one set in
         # principle, a bit of wiggle room is expected
-        self.assertGreaterEqual(float(threshold) + 0.5, avgthroughput_mb)
+        assert float(threshold) + 0.5 >= avgthroughput_mb
 
-    def compaction_strategy_switching_test(self):
-        """Ensure that switching strategies does not result in problems.
+    @pytest.mark.parametrize("strategy", strategies)
+    def test_compaction_strategy_switching(self, strategy):
+        """
+        Ensure that switching strategies does not result in problems.
         Insert data, switch strategies, then check against data loss.
         """
         strategies = ['LeveledCompactionStrategy', 
'SizeTieredCompactionStrategy', 'DateTieredCompactionStrategy']
 
-        if self.strategy in strategies:
-            strategies.remove(self.strategy)
+        if strategy in strategies:
+            strategies.remove(strategy)
             cluster = self.cluster
             cluster.populate(1).start(wait_for_binary_proto=True)
             [node1] = cluster.nodelist()
@@ -303,7 +310,7 @@ class TestCompaction(Tester):
                 session = self.patient_cql_connection(node1)
                 create_ks(session, 'ks', 1)
 
-                session.execute("create table ks.cf (key int PRIMARY KEY, val 
int) with gc_grace_seconds = 0 and compaction= {'class':'" + self.strategy + 
"'};")
+                session.execute("create table ks.cf (key int PRIMARY KEY, val 
int) with gc_grace_seconds = 0 and compaction= {'class':'" + strategy + "'};")
 
                 for x in range(0, 100):
                     session.execute('insert into ks.cf (key, val) values (' + 
str(x) + ',1)')
@@ -326,7 +333,7 @@ class TestCompaction(Tester):
                 time.sleep(5)
                 cluster.start(wait_for_binary_proto=True)
 
-    def large_compaction_warning_test(self):
+    def test_large_compaction_warning(self):
         """
         @jira_ticket CASSANDRA-9643
         Check that we log a warning when the partition size is bigger than 
compaction_large_partition_warning_threshold_mb
@@ -347,7 +354,7 @@ class TestCompaction(Tester):
 
         ret = list(session.execute("SELECT properties from ks.large where 
userid = 'user'"))
         assert_length_equal(ret, 1)
-        self.assertEqual(200, len(ret[0][0].keys()))
+        assert 200 == len(list(ret[0][0].keys()))
 
         node.flush()
 
@@ -358,9 +365,10 @@ class TestCompaction(Tester):
 
         ret = list(session.execute("SELECT properties from ks.large where 
userid = 'user'"))
         assert_length_equal(ret, 1)
-        self.assertEqual(200, len(ret[0][0].keys()))
+        assert 200 == len(list(ret[0][0].keys()))
 
-    def disable_autocompaction_nodetool_test(self):
+    @pytest.mark.parametrize("strategy", strategies)
+    def test_disable_autocompaction_nodetool(self, strategy):
         """
         Make sure we can enable/disable compaction using nodetool
         """
@@ -369,7 +377,7 @@ class TestCompaction(Tester):
         [node] = cluster.nodelist()
         session = self.patient_cql_connection(node)
         create_ks(session, 'ks', 1)
-        session.execute('CREATE TABLE to_disable (id int PRIMARY KEY, d TEXT) 
WITH compaction = {{\'class\':\'{0}\'}}'.format(self.strategy))
+        session.execute('CREATE TABLE to_disable (id int PRIMARY KEY, d TEXT) 
WITH compaction = {{\'class\':\'{0}\'}}'.format(strategy))
         node.nodetool('disableautocompaction ks to_disable')
         for i in range(1000):
             session.execute('insert into to_disable (id, d) values ({0}, 
\'{1}\')'.format(i, 'hello' * 100))
@@ -379,13 +387,14 @@ class TestCompaction(Tester):
             log_file = 'system.log'
         else:
             log_file = 'debug.log'
-        self.assertTrue(len(node.grep_log('Compacting.+to_disable', 
filename=log_file)) == 0, 'Found compaction log items for 
{0}'.format(self.strategy))
+        assert len(node.grep_log('Compacting.+to_disable', filename=log_file)) 
== 0, 'Found compaction log items for {0}'.format(strategy)
         node.nodetool('enableautocompaction ks to_disable')
         # sleep to allow compactions to start
         time.sleep(2)
-        self.assertTrue(len(node.grep_log('Compacting.+to_disable', 
filename=log_file)) > 0, 'Found no log items for {0}'.format(self.strategy))
+        assert len(node.grep_log('Compacting.+to_disable', filename=log_file)) 
> 0, 'Found no log items for {0}'.format(strategy)
 
-    def disable_autocompaction_schema_test(self):
+    @pytest.mark.parametrize("strategy", strategies)
+    def test_disable_autocompaction_schema(self, strategy):
         """
         Make sure we can disable compaction via the schema compaction 
parameter 'enabled' = false
         """
@@ -394,7 +403,7 @@ class TestCompaction(Tester):
         [node] = cluster.nodelist()
         session = self.patient_cql_connection(node)
         create_ks(session, 'ks', 1)
-        session.execute('CREATE TABLE to_disable (id int PRIMARY KEY, d TEXT) 
WITH compaction = {{\'class\':\'{0}\', 
\'enabled\':\'false\'}}'.format(self.strategy))
+        session.execute('CREATE TABLE to_disable (id int PRIMARY KEY, d TEXT) 
WITH compaction = {{\'class\':\'{0}\', 
\'enabled\':\'false\'}}'.format(strategy))
         for i in range(1000):
             session.execute('insert into to_disable (id, d) values ({0}, 
\'{1}\')'.format(i, 'hello' * 100))
             if i % 100 == 0:
@@ -404,7 +413,7 @@ class TestCompaction(Tester):
         else:
             log_file = 'debug.log'
 
-        self.assertTrue(len(node.grep_log('Compacting.+to_disable', 
filename=log_file)) == 0, 'Found compaction log items for 
{0}'.format(self.strategy))
+        assert len(node.grep_log('Compacting.+to_disable', filename=log_file)) 
== 0, 'Found compaction log items for {0}'.format(strategy)
         # should still be disabled after restart:
         node.stop()
         node.start(wait_for_binary_proto=True)
@@ -412,13 +421,14 @@ class TestCompaction(Tester):
         session.execute("use ks")
         # sleep to make sure we dont start any logs
         time.sleep(2)
-        self.assertTrue(len(node.grep_log('Compacting.+to_disable', 
filename=log_file)) == 0, 'Found compaction log items for 
{0}'.format(self.strategy))
+        assert len(node.grep_log('Compacting.+to_disable', filename=log_file)) 
== 0, 'Found compaction log items for {0}'.format(strategy)
         node.nodetool('enableautocompaction ks to_disable')
         # sleep to allow compactions to start
         time.sleep(2)
-        self.assertTrue(len(node.grep_log('Compacting.+to_disable', 
filename=log_file)) > 0, 'Found no log items for {0}'.format(self.strategy))
+        assert len(node.grep_log('Compacting.+to_disable', filename=log_file)) 
> 0, 'Found no log items for {0}'.format(strategy)
 
-    def disable_autocompaction_alter_test(self):
+    @pytest.mark.parametrize("strategy", strategies)
+    def test_disable_autocompaction_alter(self, strategy):
         """
         Make sure we can enable compaction using an alter-statement
         """
@@ -427,8 +437,8 @@ class TestCompaction(Tester):
         [node] = cluster.nodelist()
         session = self.patient_cql_connection(node)
         create_ks(session, 'ks', 1)
-        session.execute('CREATE TABLE to_disable (id int PRIMARY KEY, d TEXT) 
WITH compaction = {{\'class\':\'{0}\'}}'.format(self.strategy))
-        session.execute('ALTER TABLE to_disable WITH compaction = 
{{\'class\':\'{0}\', \'enabled\':\'false\'}}'.format(self.strategy))
+        session.execute('CREATE TABLE to_disable (id int PRIMARY KEY, d TEXT) 
WITH compaction = {{\'class\':\'{0}\'}}'.format(strategy))
+        session.execute('ALTER TABLE to_disable WITH compaction = 
{{\'class\':\'{0}\', \'enabled\':\'false\'}}'.format(strategy))
         for i in range(1000):
             session.execute('insert into to_disable (id, d) values ({0}, 
\'{1}\')'.format(i, 'hello' * 100))
             if i % 100 == 0:
@@ -437,16 +447,17 @@ class TestCompaction(Tester):
             log_file = 'system.log'
         else:
             log_file = 'debug.log'
-        self.assertTrue(len(node.grep_log('Compacting.+to_disable', 
filename=log_file)) == 0, 'Found compaction log items for 
{0}'.format(self.strategy))
-        session.execute('ALTER TABLE to_disable WITH compaction = 
{{\'class\':\'{0}\', \'enabled\':\'true\'}}'.format(self.strategy))
+        assert len(node.grep_log('Compacting.+to_disable', filename=log_file)) 
== 0, 'Found compaction log items for {0}'.format(strategy)
+        session.execute('ALTER TABLE to_disable WITH compaction = 
{{\'class\':\'{0}\', \'enabled\':\'true\'}}'.format(strategy))
         # we need to flush atleast once when altering to enable:
         session.execute('insert into to_disable (id, d) values (99, 
\'hello\')')
         node.flush()
         # sleep to allow compactions to start
         time.sleep(2)
-        self.assertTrue(len(node.grep_log('Compacting.+to_disable', 
filename=log_file)) > 0, 'Found no log items for {0}'.format(self.strategy))
+        assert len(node.grep_log('Compacting.+to_disable', filename=log_file)) 
> 0, 'Found no log items for {0}'.format(strategy)
 
-    def disable_autocompaction_alter_and_nodetool_test(self):
+    @pytest.mark.parametrize("strategy", strategies)
+    def test_disable_autocompaction_alter_and_nodetool(self, strategy):
         """
         Make sure compaction stays disabled after an alter statement where we 
have disabled using nodetool first
         """
@@ -455,7 +466,7 @@ class TestCompaction(Tester):
         [node] = cluster.nodelist()
         session = self.patient_cql_connection(node)
         create_ks(session, 'ks', 1)
-        session.execute('CREATE TABLE to_disable (id int PRIMARY KEY, d TEXT) 
WITH compaction = {{\'class\':\'{0}\'}}'.format(self.strategy))
+        session.execute('CREATE TABLE to_disable (id int PRIMARY KEY, d TEXT) 
WITH compaction = {{\'class\':\'{0}\'}}'.format(strategy))
         node.nodetool('disableautocompaction ks to_disable')
         for i in range(1000):
             session.execute('insert into to_disable (id, d) values ({0}, 
\'{1}\')'.format(i, 'hello' * 100))
@@ -465,19 +476,19 @@ class TestCompaction(Tester):
             log_file = 'system.log'
         else:
             log_file = 'debug.log'
-        self.assertTrue(len(node.grep_log('Compacting.+to_disable', 
filename=log_file)) == 0, 'Found compaction log items for 
{0}'.format(self.strategy))
-        session.execute('ALTER TABLE to_disable WITH compaction = 
{{\'class\':\'{0}\', \'tombstone_threshold\':0.9}}'.format(self.strategy))
+        assert len(node.grep_log('Compacting.+to_disable', filename=log_file)) 
== 0, 'Found compaction log items for {0}'.format(strategy)
+        session.execute('ALTER TABLE to_disable WITH compaction = 
{{\'class\':\'{0}\', \'tombstone_threshold\':0.9}}'.format(strategy))
         session.execute('insert into to_disable (id, d) values (99, 
\'hello\')')
         node.flush()
         time.sleep(2)
-        self.assertTrue(len(node.grep_log('Compacting.+to_disable', 
filename=log_file)) == 0, 'Found log items for {0}'.format(self.strategy))
+        assert len(node.grep_log('Compacting.+to_disable', filename=log_file)) 
== 0, 'Found log items for {0}'.format(strategy)
         node.nodetool('enableautocompaction ks to_disable')
         # sleep to allow compactions to start
         time.sleep(2)
-        self.assertTrue(len(node.grep_log('Compacting.+to_disable', 
filename=log_file)) > 0, 'Found no log items for {0}'.format(self.strategy))
+        assert len(node.grep_log('Compacting.+to_disable', filename=log_file)) 
> 0, 'Found no log items for {0}'.format(strategy)
 
     @since('3.7')
-    def user_defined_compaction_test(self):
+    def test_user_defined_compaction(self):
         """
         Test a user defined compaction task by generating a few sstables with 
cassandra stress
         and autocompaction disabled, and then passing a list of sstable data 
files directly to nodetool compact.
@@ -499,20 +510,21 @@ class TestCompaction(Tester):
         node1.nodetool('flush keyspace1 standard1')
 
         sstable_files = ' '.join(node1.get_sstable_data_files('keyspace1', 
'standard1'))
-        debug('Compacting {}'.format(sstable_files))
+        logger.debug('Compacting {}'.format(sstable_files))
         node1.nodetool('compact --user-defined {}'.format(sstable_files))
 
         sstable_files = node1.get_sstable_data_files('keyspace1', 'standard1')
-        self.assertEquals(len(node1.data_directories()), len(sstable_files),
-                          'Expected one sstable data file per node directory 
but got {}'.format(sstable_files))
+        assert len(node1.data_directories()) == len(sstable_files), \
+            'Expected one sstable data file per node directory but got 
{}'.format(sstable_files)
 
+    @pytest.mark.parametrize("strategy", ['LeveledCompactionStrategy'])
     @since('3.10')
-    def fanout_size_test(self):
+    def test_fanout_size(self, strategy):
         """
         @jira_ticket CASSANDRA-11550
         """
-        if not hasattr(self, 'strategy') or self.strategy != 
'LeveledCompactionStrategy':
-            self.skipTest('Not implemented unless LeveledCompactionStrategy is 
used')
+        if not hasattr(self, 'strategy') or strategy != 
'LeveledCompactionStrategy':
+            pytest.skip('Not implemented unless LeveledCompactionStrategy is 
used')
 
         cluster = self.cluster
         cluster.populate(1).start(wait_for_binary_proto=True)
@@ -522,7 +534,7 @@ class TestCompaction(Tester):
         node1.nodetool('disableautocompaction')
 
         session = self.patient_cql_connection(node1)
-        debug("Altering compaction strategy to LCS")
+        logger.debug("Altering compaction strategy to LCS")
         session.execute("ALTER TABLE keyspace1.standard1 with 
compaction={'class': 'LeveledCompactionStrategy', 'sstable_size_in_mb':1, 
'fanout_size':10};")
 
         stress_write(node1, keycount=1000000)
@@ -538,9 +550,9 @@ class TestCompaction(Tester):
         # [0, ?/10, ?, 0, 0, 0...]
         p = re.compile(r'0,\s(\d+)/10,.*')
         m = p.search(output)
-        self.assertEqual(10 * len(node1.data_directories()), int(m.group(1)))
+        assert 10 * len(node1.data_directories()) == int(m.group(1))
 
-        debug("Altering the fanout_size")
+        logger.debug("Altering the fanout_size")
         session.execute("ALTER TABLE keyspace1.standard1 with 
compaction={'class': 'LeveledCompactionStrategy', 'sstable_size_in_mb':1, 
'fanout_size':5};")
 
         # trigger the compaction
@@ -551,12 +563,12 @@ class TestCompaction(Tester):
         # [0, ?/5, ?/25, ?, 0, 0...]
         p = re.compile(r'0,\s(\d+)/5,\s(\d+)/25,.*')
         m = p.search(output)
-        self.assertEqual(5 * len(node1.data_directories()), int(m.group(1)))
-        self.assertEqual(25 * len(node1.data_directories()), int(m.group(2)))
+        assert 5 * len(node1.data_directories()) == int(m.group(1))
+        assert 25 * len(node1.data_directories()) == int(m.group(2))
 
-    def skip_if_no_major_compaction(self):
-        if self.cluster.version() < '2.2' and self.strategy == 
'LeveledCompactionStrategy':
-            self.skipTest('major compaction not implemented for LCS in this 
version of Cassandra')
+    def skip_if_no_major_compaction(self, strategy):
+        if self.cluster.version() < '2.2' and strategy == 
'LeveledCompactionStrategy':
+            pytest.skip(msg='major compaction not implemented for LCS in this 
version of Cassandra')
 
 
 def grep_sstables_in_each_level(node, table_name):
@@ -567,14 +579,8 @@ def grep_sstables_in_each_level(node, table_name):
 
 
 def get_random_word(wordLen, population=string.ascii_letters + string.digits):
-    return ''.join([random.choice(population) for _ in range(wordLen)])
+    return ''.join([random.choice(population) for _ in range(int(wordLen))])
 
 
 def stress_write(node, keycount=100000):
     node.stress(['write', 'n={keycount}'.format(keycount=keycount)])
-
-
-strategies = ['LeveledCompactionStrategy', 'SizeTieredCompactionStrategy', 
'DateTieredCompactionStrategy']
-for strategy in strategies:
-    cls_name = ('TestCompaction_with_' + strategy)
-    vars()[cls_name] = type(cls_name, (TestCompaction,), {'strategy': 
strategy, '__test__': True})

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/compression_test.py
----------------------------------------------------------------------
diff --git a/compression_test.py b/compression_test.py
index c8362c9..d865ba2 100644
--- a/compression_test.py
+++ b/compression_test.py
@@ -1,9 +1,13 @@
 import os
+import pytest
+import logging
 
 from dtest import create_ks
 from scrub_test import TestHelper
 from tools.assertions import assert_crc_check_chance_equal
-from tools.decorators import since
+
+since = pytest.mark.since
+logger = logging.getLogger(__name__)
 
 
 class TestCompression(TestHelper):
@@ -16,10 +20,10 @@ class TestCompression(TestHelper):
 
         with open(file, 'rb') as fh:
             file_start = fh.read(2)
-            return types.get(file_start.encode('hex'), 'UNKNOWN')
+            return types.get(file_start.hex(), 'UNKNOWN')
 
     @since("3.0")
-    def disable_compression_cql_test(self):
+    def test_disable_compression_cql(self):
         """
         @jira_ticket CASSANDRA-8384
         using new cql create table syntax to disable compression
@@ -33,7 +37,7 @@ class TestCompression(TestHelper):
         session.execute("create table disabled_compression_table (id uuid 
PRIMARY KEY ) WITH compression = {'enabled': false};")
         session.cluster.refresh_schema_metadata()
         meta = 
session.cluster.metadata.keyspaces['ks'].tables['disabled_compression_table']
-        self.assertEqual('false', meta.options['compression']['enabled'])
+        assert 'false' == meta.options['compression']['enabled']
 
         for n in range(0, 100):
             session.execute("insert into disabled_compression_table (id) 
values (uuid());")
@@ -44,12 +48,12 @@ class TestCompression(TestHelper):
         for sstable_path in sstable_paths:
             sstable = os.path.join(sstable_path, 
sstables['disabled_compression_table'][1])
             if os.path.exists(sstable):
-                self.assertEqual('NONE', self._get_compression_type(sstable))
+                assert 'NONE' == self._get_compression_type(sstable)
                 found = True
-        self.assertTrue(found)
+        assert found
 
     @since("3.0")
-    def compression_cql_options_test(self):
+    def test_compression_cql_options(self):
         """
         @jira_ticket CASSANDRA-8384
         using new cql create table syntax to configure compression
@@ -72,12 +76,12 @@ class TestCompression(TestHelper):
 
         session.cluster.refresh_schema_metadata()
         meta = 
session.cluster.metadata.keyspaces['ks'].tables['compression_opts_table']
-        self.assertEqual('org.apache.cassandra.io.compress.DeflateCompressor', 
meta.options['compression']['class'])
-        self.assertEqual('256', 
meta.options['compression']['chunk_length_in_kb'])
+        assert 'org.apache.cassandra.io.compress.DeflateCompressor' == 
meta.options['compression']['class']
+        assert '256' == meta.options['compression']['chunk_length_in_kb']
         assert_crc_check_chance_equal(session, "compression_opts_table", 0.25)
 
         warn = node.grep_log("The option crc_check_chance was deprecated as a 
compression option.")
-        self.assertEqual(len(warn), 0)
+        assert len(warn) == 0
         session.execute("""
             alter table compression_opts_table
                 WITH compression = {
@@ -87,13 +91,13 @@ class TestCompression(TestHelper):
                 }
             """)
         warn = node.grep_log("The option crc_check_chance was deprecated as a 
compression option.")
-        self.assertEqual(len(warn), 1)
+        assert len(warn) == 1
 
         # check metadata again after crc_check_chance_update
         session.cluster.refresh_schema_metadata()
         meta = 
session.cluster.metadata.keyspaces['ks'].tables['compression_opts_table']
-        self.assertEqual('org.apache.cassandra.io.compress.DeflateCompressor', 
meta.options['compression']['class'])
-        self.assertEqual('256', 
meta.options['compression']['chunk_length_in_kb'])
+        assert 'org.apache.cassandra.io.compress.DeflateCompressor' == 
meta.options['compression']['class']
+        assert '256' == meta.options['compression']['chunk_length_in_kb']
         assert_crc_check_chance_equal(session, "compression_opts_table", 0.6)
 
         for n in range(0, 100):
@@ -105,12 +109,12 @@ class TestCompression(TestHelper):
         for sstable_path in sstable_paths:
             sstable = os.path.join(sstable_path, 
sstables['compression_opts_table'][1])
             if os.path.exists(sstable):
-                self.assertEqual('DEFLATE', 
self._get_compression_type(sstable))
+                assert 'DEFLATE' == self._get_compression_type(sstable)
                 found = True
-        self.assertTrue(found)
+        assert found
 
     @since("3.0")
-    def compression_cql_disabled_with_alter_test(self):
+    def test_compression_cql_disabled_with_alter(self):
         """
         @jira_ticket CASSANDRA-8384
         starting with compression enabled then disabling it
@@ -131,17 +135,17 @@ class TestCompression(TestHelper):
                 AND crc_check_chance = 0.25;
             """)
         meta = 
session.cluster.metadata.keyspaces['ks'].tables['start_enabled_compression_table']
-        self.assertEqual('org.apache.cassandra.io.compress.SnappyCompressor', 
meta.options['compression']['class'])
-        self.assertEqual('256', 
meta.options['compression']['chunk_length_in_kb'])
+        assert 'org.apache.cassandra.io.compress.SnappyCompressor' == 
meta.options['compression']['class']
+        assert '256' == meta.options['compression']['chunk_length_in_kb']
         assert_crc_check_chance_equal(session, 
"start_enabled_compression_table", 0.25)
         session.execute("alter table start_enabled_compression_table with 
compression = {'enabled': false};")
 
         session.cluster.refresh_schema_metadata()
         meta = 
session.cluster.metadata.keyspaces['ks'].tables['start_enabled_compression_table']
-        self.assertEqual('false', meta.options['compression']['enabled'])
+        assert 'false' == meta.options['compression']['enabled']
 
     @since("3.0")
-    def compression_cql_enabled_with_alter_test(self):
+    def test_compression_cql_enabled_with_alter(self):
         """
         @jira_ticket CASSANDRA-8384
         starting with compression disabled and enabling it
@@ -154,7 +158,7 @@ class TestCompression(TestHelper):
         create_ks(session, 'ks', 1)
         session.execute("create table start_disabled_compression_table (id 
uuid PRIMARY KEY ) WITH compression = {'enabled': false};")
         meta = 
session.cluster.metadata.keyspaces['ks'].tables['start_disabled_compression_table']
-        self.assertEqual('false', meta.options['compression']['enabled'])
+        assert 'false' == meta.options['compression']['enabled']
         session.execute("""alter table start_disabled_compression_table
                                 WITH compression = {
                                         'class': 'SnappyCompressor',
@@ -163,6 +167,6 @@ class TestCompression(TestHelper):
 
         session.cluster.refresh_schema_metadata()
         meta = 
session.cluster.metadata.keyspaces['ks'].tables['start_disabled_compression_table']
-        self.assertEqual('org.apache.cassandra.io.compress.SnappyCompressor', 
meta.options['compression']['class'])
-        self.assertEqual('256', 
meta.options['compression']['chunk_length_in_kb'])
+        assert 'org.apache.cassandra.io.compress.SnappyCompressor' == 
meta.options['compression']['class']
+        assert '256' == meta.options['compression']['chunk_length_in_kb']
         assert_crc_check_chance_equal(session, 
"start_disabled_compression_table", 0.25)

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/concurrent_schema_changes_test.py
----------------------------------------------------------------------
diff --git a/concurrent_schema_changes_test.py 
b/concurrent_schema_changes_test.py
index 49041f9..d0af49c 100644
--- a/concurrent_schema_changes_test.py
+++ b/concurrent_schema_changes_test.py
@@ -3,15 +3,19 @@ import os
 import pprint
 import re
 import time
+import pytest
+import logging
+
 from random import randrange
 from threading import Thread
-from unittest import skip
 
 from cassandra.concurrent import execute_concurrent
 from ccmlib.node import Node
 
-from dtest import Tester, debug, create_ks
-from tools.decorators import since
+from dtest import Tester, create_ks
+
+since = pytest.mark.since
+logger = logging.getLogger(__name__)
 
 
 def wait(delay=2):
@@ -21,7 +25,7 @@ def wait(delay=2):
     time.sleep(delay)
 
 
-@skip('awaiting CASSANDRA-10699')
[email protected](reason='awaiting CASSANDRA-10699')
 class TestConcurrentSchemaChanges(Tester):
     allow_log_errors = True
 
@@ -29,7 +33,7 @@ class TestConcurrentSchemaChanges(Tester):
         """
         prepares for schema changes by creating a keyspace and column family.
         """
-        debug("prepare_for_changes() " + str(namespace))
+        logger.debug("prepare_for_changes() " + str(namespace))
         # create a keyspace that will be used
         create_ks(session, "ks_%s" % namespace, 2)
         session.execute('USE ks_%s' % namespace)
@@ -77,7 +81,7 @@ class TestConcurrentSchemaChanges(Tester):
         rebuild index (via jmx)
         set default_validation_class
         """
-        debug("make_schema_changes() " + str(namespace))
+        logger.debug("make_schema_changes() " + str(namespace))
         session.execute('USE ks_%s' % namespace)
         # drop keyspace
         session.execute('DROP KEYSPACE ks2_%s' % namespace)
@@ -117,14 +121,14 @@ class TestConcurrentSchemaChanges(Tester):
 
     def validate_schema_consistent(self, node):
         """ Makes sure that there is only one schema """
-        debug("validate_schema_consistent() " + node.name)
+        logger.debug("validate_schema_consistent() " + node.name)
 
         response = node.nodetool('describecluster').stdout
         schemas = response.split('Schema versions:')[1].strip()
         num_schemas = len(re.findall('\[.*?\]', schemas))
-        self.assertEqual(num_schemas, 1, "There were multiple schema versions: 
{}".format(pprint.pformat(schemas)))
+        assert num_schemas, 1 == "There were multiple schema versions: 
{}".format(pprint.pformat(schemas))
 
-    def create_lots_of_tables_concurrently_test(self):
+    def test_create_lots_of_tables_concurrently(self):
         """
         create tables across multiple threads concurrently
         """
@@ -141,18 +145,18 @@ class TestConcurrentSchemaChanges(Tester):
         results = execute_concurrent(session, cmds, raise_on_first_error=True, 
concurrency=200)
 
         for (success, result) in results:
-            self.assertTrue(success, "didn't get success on table create: 
{}".format(result))
+            assert success, "didn't get success on table create: 
{}".format(result)
 
         wait(10)
 
         session.cluster.refresh_schema_metadata()
         table_meta = session.cluster.metadata.keyspaces["lots_o_tables"].tables
-        self.assertEqual(250, len(table_meta))
+        assert 250 == len(table_meta)
         self.validate_schema_consistent(node1)
         self.validate_schema_consistent(node2)
         self.validate_schema_consistent(node3)
 
-    def create_lots_of_alters_concurrently_test(self):
+    def test_create_lots_of_alters_concurrently(self):
         """
         create alters across multiple threads concurrently
         """
@@ -169,26 +173,26 @@ class TestConcurrentSchemaChanges(Tester):
 
         cmds = [("alter table base_{0} add c_{1} int".format(randrange(0, 10), 
n), ()) for n in range(500)]
 
-        debug("executing 500 alters")
+        logger.debug("executing 500 alters")
         results = execute_concurrent(session, cmds, raise_on_first_error=True, 
concurrency=150)
 
         for (success, result) in results:
-            self.assertTrue(success, "didn't get success on table create: 
{}".format(result))
+            assert success, "didn't get success on table create: 
{}".format(result)
 
-        debug("waiting for alters to propagate")
+        logger.debug("waiting for alters to propagate")
         wait(30)
 
         session.cluster.refresh_schema_metadata()
         table_meta = session.cluster.metadata.keyspaces["lots_o_alters"].tables
-        column_ct = sum([len(table.columns) for table in table_meta.values()])
+        column_ct = sum([len(table.columns) for table in 
list(table_meta.values())])
 
         # primary key + alters
-        self.assertEqual(510, column_ct)
+        assert 510 == column_ct
         self.validate_schema_consistent(node1)
         self.validate_schema_consistent(node2)
         self.validate_schema_consistent(node3)
 
-    def create_lots_of_indexes_concurrently_test(self):
+    def test_create_lots_of_indexes_concurrently(self):
         """
         create indexes across multiple threads concurrently
         """
@@ -205,7 +209,7 @@ class TestConcurrentSchemaChanges(Tester):
                 session.execute("insert into base_{0} (id, c1, c2) values 
(uuid(), {1}, {2})".format(n, ins, ins))
         wait(5)
 
-        debug("creating indexes")
+        logger.debug("creating indexes")
         cmds = []
         for n in range(5):
             cmds.append(("create index ix_base_{0}_c1 on base_{0} 
(c1)".format(n), ()))
@@ -214,31 +218,31 @@ class TestConcurrentSchemaChanges(Tester):
         results = execute_concurrent(session, cmds, raise_on_first_error=True)
 
         for (success, result) in results:
-            self.assertTrue(success, "didn't get success on table create: 
{}".format(result))
+            assert success, "didn't get success on table create: 
{}".format(result)
 
         wait(5)
 
-        debug("validating schema and index list")
+        logger.debug("validating schema and index list")
         session.cluster.control_connection.wait_for_schema_agreement()
         session.cluster.refresh_schema_metadata()
         index_meta = 
session.cluster.metadata.keyspaces["lots_o_indexes"].indexes
         self.validate_schema_consistent(node1)
         self.validate_schema_consistent(node2)
-        self.assertEqual(10, len(index_meta))
+        assert 10 == len(index_meta)
         for n in range(5):
-            self.assertIn("ix_base_{0}_c1".format(n), index_meta)
-            self.assertIn("ix_base_{0}_c2".format(n), index_meta)
+            assert "ix_base_{0}_c1".format(n) in index_meta
+            assert "ix_base_{0}_c2".format(n) in index_meta
 
-        debug("waiting for indexes to fill in")
+        logger.debug("waiting for indexes to fill in")
         wait(45)
-        debug("querying all values by secondary index")
+        logger.debug("querying all values by secondary index")
         for n in range(5):
             for ins in range(1000):
-                self.assertEqual(1, len(list(session.execute("select * from 
base_{0} where c1 = {1}".format(n, ins)))))
-                self.assertEqual(1, len(list(session.execute("select * from 
base_{0} where c2 = {1}".format(n, ins)))))
+                assert 1 == len(list(session.execute("select * from base_{0} 
where c1 = {1}".format(n, ins))))
+                assert 1 == len(list(session.execute("select * from base_{0} 
where c2 = {1}".format(n, ))))
 
     @since('3.0')
-    def create_lots_of_mv_concurrently_test(self):
+    def test_create_lots_of_mv_concurrently(self):
         """
         create materialized views across multiple threads concurrently
         """
@@ -261,15 +265,15 @@ class TestConcurrentSchemaChanges(Tester):
                              "WHERE c{0} IS NOT NULL AND id IS NOT NULL 
PRIMARY KEY (c{0}, id)".format(n)))
             session.cluster.control_connection.wait_for_schema_agreement()
 
-        debug("waiting for indexes to fill in")
+        logger.debug("waiting for indexes to fill in")
         wait(60)
         result = list(session.execute(("SELECT * FROM system_schema.views "
                                        "WHERE keyspace_name='lots_o_views' AND 
base_table_name='source_data' ALLOW FILTERING")))
-        self.assertEqual(10, len(result), "missing some mv from source_data 
table")
+        assert 10, len(result) == "missing some mv from source_data table"
 
         for n in range(1, 11):
             result = list(session.execute("select * from 
src_by_c{0}".format(n)))
-            self.assertEqual(4000, len(result))
+            assert 4000 == len(result)
 
     def _do_lots_of_schema_actions(self, session):
         for n in range(20):
@@ -287,7 +291,7 @@ class TestConcurrentSchemaChanges(Tester):
 
         results = execute_concurrent(session, cmds, concurrency=100, 
raise_on_first_error=True)
         for (success, result) in results:
-            self.assertTrue(success, "didn't get success: {}".format(result))
+            assert success, "didn't get success: {}".format(result)
 
     def _verify_lots_of_schema_actions(self, session):
         session.cluster.control_connection.wait_for_schema_agreement()
@@ -302,7 +306,7 @@ class TestConcurrentSchemaChanges(Tester):
         table_meta = session.cluster.metadata.keyspaces["lots_o_churn"].tables
         errors = []
         for n in range(20):
-            self.assertTrue("new_table_{0}".format(n) in table_meta)
+            assert "new_table_{0}".format(n) in table_meta
 
             if 7 != len(table_meta["index_me_{0}".format(n)].indexes):
                 errors.append("index_me_{0} expected indexes 
ix_index_me_c0->7, got: {1}".format(n, 
sorted(list(table_meta["index_me_{0}".format(n)].indexes))))
@@ -313,9 +317,9 @@ class TestConcurrentSchemaChanges(Tester):
             if 8 != len(altered.columns):
                 errors.append("alter_me_{0} expected c1 -> c7, id, got: 
{1}".format(n, sorted(list(altered.columns))))
 
-        self.assertTrue(0 == len(errors), "\n".join(errors))
+        assert 0 == len(errors), "\n".join(errors)
 
-    def create_lots_of_schema_churn_test(self):
+    def test_create_lots_of_schema_churn(self):
         """
         create tables, indexes, alters across multiple threads concurrently
         """
@@ -327,11 +331,11 @@ class TestConcurrentSchemaChanges(Tester):
         session.execute("use lots_o_churn")
 
         self._do_lots_of_schema_actions(session)
-        debug("waiting for things to settle and sync")
+        logger.debug("waiting for things to settle and sync")
         wait(60)
         self._verify_lots_of_schema_actions(session)
 
-    def create_lots_of_schema_churn_with_node_down_test(self):
+    def test_create_lots_of_schema_churn_with_node_down(self):
         """
         create tables, indexes, alters across multiple threads concurrently 
with a node down
         """
@@ -346,15 +350,15 @@ class TestConcurrentSchemaChanges(Tester):
         self._do_lots_of_schema_actions(session)
         wait(15)
         node2.start(wait_other_notice=True)
-        debug("waiting for things to settle and sync")
+        logger.debug("waiting for things to settle and sync")
         wait(120)
         self._verify_lots_of_schema_actions(session)
 
-    def basic_test(self):
+    def test_basic(self):
         """
         make several schema changes on the same node.
         """
-        debug("basic_test()")
+        logger.debug("basic_test()")
 
         cluster = self.cluster
         cluster.populate(2).start()
@@ -366,8 +370,8 @@ class TestConcurrentSchemaChanges(Tester):
 
         self.make_schema_changes(session, namespace='ns1')
 
-    def changes_to_different_nodes_test(self):
-        debug("changes_to_different_nodes_test()")
+    def test_changes_to_different_nodes(self):
+        logger.debug("changes_to_different_nodes_test()")
         cluster = self.cluster
         cluster.populate(2).start()
         node1, node2 = cluster.nodelist()
@@ -389,13 +393,13 @@ class TestConcurrentSchemaChanges(Tester):
         # check both, just because we can
         self.validate_schema_consistent(node2)
 
-    def changes_while_node_down_test(self):
+    def test_changes_while_node_down(self):
         """
         makes schema changes while a node is down.
         Make schema changes to node 1 while node 2 is down.
         Then bring up 2 and make sure it gets the changes.
         """
-        debug("changes_while_node_down_test()")
+        logger.debug("changes_while_node_down_test()")
         cluster = self.cluster
         cluster.populate(2).start()
         node1, node2 = cluster.nodelist()
@@ -414,7 +418,7 @@ class TestConcurrentSchemaChanges(Tester):
         wait(20)
         self.validate_schema_consistent(node1)
 
-    def changes_while_node_toggle_test(self):
+    def test_changes_while_node_toggle(self):
         """
         makes schema changes while a node is down.
 
@@ -422,7 +426,7 @@ class TestConcurrentSchemaChanges(Tester):
         Bring down 2, bring up 1, and finally bring up 2.
         1 should get the changes.
         """
-        debug("changes_while_node_toggle_test()")
+        logger.debug("changes_while_node_toggle_test()")
         cluster = self.cluster
         cluster.populate(2).start()
         node1, node2 = cluster.nodelist()
@@ -441,8 +445,8 @@ class TestConcurrentSchemaChanges(Tester):
         wait(20)
         self.validate_schema_consistent(node1)
 
-    def decommission_node_test(self):
-        debug("decommission_node_test()")
+    def test_decommission_node(self):
+        logger.debug("decommission_node_test()")
         cluster = self.cluster
 
         cluster.populate(1)
@@ -490,8 +494,8 @@ class TestConcurrentSchemaChanges(Tester):
         wait(30)
         self.validate_schema_consistent(node1)
 
-    def snapshot_test(self):
-        debug("snapshot_test()")
+    def test_snapshot(self):
+        logger.debug("snapshot_test()")
         cluster = self.cluster
         cluster.populate(2).start()
         node1, node2 = cluster.nodelist()
@@ -535,11 +539,11 @@ class TestConcurrentSchemaChanges(Tester):
         wait(2)
         self.validate_schema_consistent(node1)
 
-    def load_test(self):
+    def test_load(self):
         """
         apply schema changes while the cluster is under load.
         """
-        debug("load_test()")
+        logger.debug("load_test()")
 
         cluster = self.cluster
         cluster.populate(1).start()
@@ -548,14 +552,14 @@ class TestConcurrentSchemaChanges(Tester):
         session = self.cql_connection(node1)
 
         def stress(args=[]):
-            debug("Stressing")
+            logger.debug("Stressing")
             node1.stress(args)
-            debug("Done Stressing")
+            logger.debug("Done Stressing")
 
         def compact():
-            debug("Compacting...")
+            logger.debug("Compacting...")
             node1.nodetool('compact')
-            debug("Done Compacting.")
+            logger.debug("Done Compacting.")
 
         # put some data into the cluster
         stress(['write', 'n=30000', 'no-warmup', '-rate', 'threads=8'])

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/configuration_test.py
----------------------------------------------------------------------
diff --git a/configuration_test.py b/configuration_test.py
index 2f696eb..6bb5e95 100644
--- a/configuration_test.py
+++ b/configuration_test.py
@@ -1,16 +1,30 @@
 import os
-
+import logging
 import parse
+import pytest
+
 from cassandra.concurrent import execute_concurrent_with_args
 
-from dtest import Tester, debug, create_ks
+from tools.misc import ImmutableMapping
+from dtest_setup_overrides import DTestSetupOverrides
+from dtest import Tester, create_ks
 from tools.jmxutils import (JolokiaAgent, make_mbean,
                             remove_perf_disable_shared_mem)
 
+logger = logging.getLogger(__name__)
+
+
[email protected]()
+def fixture_dtest_setup_overrides(request):
+    dtest_setup_overrides = DTestSetupOverrides()
+    if request.node.name == "test_change_durable_writes":
+        dtest_setup_overrides.cluster_options = 
ImmutableMapping({'commitlog_segment_size_in_mb': 1})
+    return dtest_setup_overrides
+
 
 class TestConfiguration(Tester):
 
-    def compression_chunk_length_test(self):
+    def test_compression_chunk_length(self):
         """ Verify the setting of compression chunk_length [#3558]"""
         cluster = self.cluster
 
@@ -20,7 +34,9 @@ class TestConfiguration(Tester):
         create_ks(session, 'ks', 1)
 
         create_table_query = "CREATE TABLE test_table (row varchar, name 
varchar, value int, PRIMARY KEY (row, name));"
-        alter_chunk_len_query = "ALTER TABLE test_table WITH compression = 
{{'sstable_compression' : 'SnappyCompressor', 'chunk_length_kb' : 
{chunk_length}}};"
+        alter_chunk_len_query = "ALTER TABLE test_table WITH " \
+                                "compression = {{'sstable_compression' : 
'SnappyCompressor', " \
+                                "'chunk_length_kb' : {chunk_length}}};"
 
         session.execute(create_table_query)
 
@@ -30,7 +46,8 @@ class TestConfiguration(Tester):
         session.execute(alter_chunk_len_query.format(chunk_length=64))
         self._check_chunk_length(session, 64)
 
-    def change_durable_writes_test(self):
+    @pytest.mark.timeout(60*30)
+    def test_change_durable_writes(self):
         """
         @jira_ticket CASSANDRA-9560
 
@@ -51,15 +68,14 @@ class TestConfiguration(Tester):
         """
         def new_commitlog_cluster_node():
             # writes should block on commitlog fsync
-            self.cluster.populate(1)
-            node = self.cluster.nodelist()[0]
-            
self.cluster.set_configuration_options(values={'commitlog_segment_size_in_mb': 
1})
-            self.cluster.set_batch_commitlog(enabled=True)
+            self.fixture_dtest_setup.cluster.populate(1)
+            node = self.fixture_dtest_setup.cluster.nodelist()[0]
+            self.fixture_dtest_setup.cluster.set_batch_commitlog(enabled=True)
 
             # disable JVM option so we can use Jolokia
-            # this has to happen after .set_configuration_options because of 
implmentation details
+            # this has to happen after .set_configuration_options because of 
implementation details
             remove_perf_disable_shared_mem(node)
-            self.cluster.start(wait_for_binary_proto=True)
+            self.fixture_dtest_setup.cluster.start(wait_for_binary_proto=True)
             return node
 
         durable_node = new_commitlog_cluster_node()
@@ -70,16 +86,15 @@ class TestConfiguration(Tester):
         durable_session.execute("CREATE KEYSPACE ks WITH REPLICATION = 
{'class': 'SimpleStrategy', 'replication_factor': 1} "
                                 "AND DURABLE_WRITES = true")
         durable_session.execute('CREATE TABLE ks.tab (key int PRIMARY KEY, a 
int, b int, c int)')
-        debug('commitlog size diff = ' + str(commitlog_size(durable_node) - 
durable_init_size))
+        logger.debug('commitlog size diff = ' + 
str(commitlog_size(durable_node) - durable_init_size))
         write_to_trigger_fsync(durable_session, 'ks', 'tab')
 
-        self.assertGreater(commitlog_size(durable_node), durable_init_size,
-                           msg='This test will not work in this environment; '
-                               'write_to_trigger_fsync does not trigger 
fsync.')
+        assert commitlog_size(durable_node) > durable_init_size, \
+            "This test will not work in this environment; 
write_to_trigger_fsync does not trigger fsync."
 
         # get a fresh cluster to work on
-        self.tearDown()
-        self.setUp()
+        durable_session.shutdown()
+        self.fixture_dtest_setup.cleanup_and_replace_cluster()
 
         node = new_commitlog_cluster_node()
         init_size = commitlog_size(node)
@@ -91,8 +106,7 @@ class TestConfiguration(Tester):
         session.execute('CREATE TABLE ks.tab (key int PRIMARY KEY, a int, b 
int, c int)')
         session.execute('ALTER KEYSPACE ks WITH DURABLE_WRITES=true')
         write_to_trigger_fsync(session, 'ks', 'tab')
-        self.assertGreater(commitlog_size(node), init_size,
-                           msg='ALTER KEYSPACE was not respected')
+        assert commitlog_size(node) > init_size, "ALTER KEYSPACE was not 
respected"
 
     def overlapping_data_folders(self):
         """
@@ -130,12 +144,13 @@ class TestConfiguration(Tester):
             if 'compression' in result:
                 params = result
 
-        self.assertNotEqual(params, '', "Looking for the string 
'sstable_compression', but could not find it in {str}".format(str=result))
+        assert not params == '', "Looking for the string 
'sstable_compression', but could not find " \
+                                 "it in {str}".format(str=result)
 
         chunk_string = "chunk_length_kb" if self.cluster.version() < '3.0' 
else "chunk_length_in_kb"
         chunk_length = parse.search("'" + chunk_string + "': 
'{chunk_length:d}'", result).named['chunk_length']
 
-        self.assertEqual(chunk_length, value, "Expected chunk_length: {}.  We 
got: {}".format(value, chunk_length))
+        assert chunk_length == value, "Expected chunk_length: {}.  We got: 
{}".format(value, chunk_length)
 
 
 def write_to_trigger_fsync(session, ks, table):
@@ -145,9 +160,17 @@ def write_to_trigger_fsync(session, ks, table):
     commitlog_segment_size_in_mb is 1. Assumes the table's columns are
     (key int, a int, b int, c int).
     """
+    """
+    From https://github.com/datastax/python-driver/pull/877/files
+      "Note: in the case that `generators` are used, it is important to ensure 
the consumers do not
+       block or attempt further synchronous requests, because no further IO 
will be processed until
+       the consumer returns. This may also produce a deadlock in the IO event 
thread."
+    """
     execute_concurrent_with_args(session,
-                                 session.prepare('INSERT INTO "{ks}"."{table}" 
(key, a, b, c) VALUES (?, ?, ?, ?)'.format(ks=ks, table=table)),
-                                 ((x, x + 1, x + 2, x + 3) for x in 
range(50000)))
+                                 session.prepare('INSERT INTO "{ks}"."{table}" 
(key, a, b, c) '
+                                                 'VALUES (?, ?, ?, 
?)'.format(ks=ks, table=table)),
+                                 ((x, x + 1, x + 2, x + 3)
+                                 for x in range(50000)), concurrency=5)
 
 
 def commitlog_size(node):


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to