http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/cqlsh_tests/cqlsh_tests.py
----------------------------------------------------------------------
diff --git a/cqlsh_tests/cqlsh_tests.py b/cqlsh_tests/cqlsh_tests.py
index bf2b90c..1d6e96e 100644
--- a/cqlsh_tests/cqlsh_tests.py
+++ b/cqlsh_tests/cqlsh_tests.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
 import binascii
 import csv
 import datetime
@@ -6,6 +5,9 @@ import os
 import re
 import subprocess
 import sys
+import logging
+
+import pytest
 from decimal import Decimal
 from distutils.version import LooseVersion
 from tempfile import NamedTemporaryFile
@@ -16,15 +18,16 @@ from cassandra.concurrent import 
execute_concurrent_with_args
 from cassandra.query import BatchStatement, BatchType
 from ccmlib import common
 
-from cqlsh_tools import monkeypatch_driver, unmonkeypatch_driver
-from dtest import Tester, debug, create_ks, create_cf
+from .cqlsh_tools import monkeypatch_driver, unmonkeypatch_driver
+from dtest import Tester, create_ks, create_cf
 from tools.assertions import assert_all, assert_none
 from tools.data import create_c1c2_table, insert_c1c2, rows_to_list
-from tools.decorators import since
+
+since = pytest.mark.since
+logger = logging.getLogger(__name__)
 
 
 class TestCqlsh(Tester):
-    maxDiff = None
 
     @classmethod
     def setUpClass(cls):
@@ -59,13 +62,13 @@ class TestCqlsh(Tester):
 
         cmds = ['pycodestyle', '--ignore', 'E501,E402,E731', cqlsh_path] + 
cqlshlib_paths
 
-        debug(cmds)
+        logger.debug(cmds)
 
         p = subprocess.Popen(cmds, stdout=subprocess.PIPE, 
stderr=subprocess.PIPE)
         stdout, stderr = p.communicate()
 
-        self.assertEqual(len(stdout), 0, stdout)
-        self.assertEqual(len(stderr), 0, stderr)
+        assert len(stdout), 0 == stdout
+        assert len(stderr), 0 == stderr
 
     def test_simple_insert(self):
 
@@ -114,7 +117,7 @@ class TestCqlsh(Tester):
                 stmt=repr(stmt),
                 routput=repr(output)
             )
-            self.assertIn(expected_substring, output, msg=msg)
+            assert expected_substring == output in msg
 
         assert_applied("INSERT INTO lwt.lwt (id, value) VALUES (1, 'one') IF 
NOT EXISTS")
         assert_applied("INSERT INTO lwt.lwt (id, value) VALUES (1, 'one') IF 
NOT EXISTS")
@@ -141,11 +144,11 @@ class TestCqlsh(Tester):
         output, err = self.run_cqlsh(node1, 'use simple; SELECT * FROM 
simpledate')
 
         if self.cluster.version() >= LooseVersion('3.4'):
-            self.assertIn("2143-04-19 11:21:01.000000+0000", output)
-            self.assertIn("1943-04-19 11:21:01.000000+0000", output)
+            assert "2143-04-19 11:21:01.000000+0000" in output
+            assert "1943-04-19 11:21:01.000000+0000" in output
         else:
-            self.assertIn("2143-04-19 11:21:01+0000", output)
-            self.assertIn("1943-04-19 11:21:01+0000", output)
+            assert "2143-04-19 11:21:01+0000" in output
+            assert "1943-04-19 11:21:01+0000" in output
 
     @since('3.4')
     def test_sub_second_precision(self):
@@ -168,27 +171,27 @@ class TestCqlsh(Tester):
         output, err, _ = node1.run_cqlsh(cmds="use simple; SELECT * FROM 
testsubsecond "
                                          "WHERE id = 1 AND subid = '1943-06-19 
11:21:01.123+0000'")
 
-        debug(output)
-        self.assertIn("1943-06-19 11:21:01.123000+0000", output)
-        self.assertNotIn("1943-06-19 11:21:01.000000+0000", output)
+        logger.debug(output)
+        assert "1943-06-19 11:21:01.123000+0000" in output
+        assert "1943-06-19 11:21:01.000000+0000" not in output
 
         output, err, _ = node1.run_cqlsh(cmds="use simple; SELECT * FROM 
testsubsecond "
                                          "WHERE id = 2 AND subid = '1943-06-19 
11:21:01+0000'")
 
-        debug(output)
-        self.assertIn("1943-06-19 11:21:01.000000+0000", output)
-        self.assertNotIn("1943-06-19 11:21:01.123000+0000", output)
+        logger.debug(output)
+        assert "1943-06-19 11:21:01.000000+0000" in output
+        assert "1943-06-19 11:21:01.123000+0000" not in output
 
     def verify_glass(self, node):
         session = self.patient_cql_connection(node)
 
         def verify_varcharmap(map_name, expected, encode_value=False):
-            rows = list(session.execute((u"SELECT %s FROM 
testks.varcharmaptable WHERE varcharkey= '᚛᚛ᚉᚑᚅ
ᚔᚉᚉᚔᚋ ᚔᚈᚔ ᚍᚂᚐᚅᚑ ᚅᚔᚋᚌᚓᚅᚐ᚜';" % 
map_name).encode("utf-8")))
+            rows = list(session.execute(("SELECT %s FROM 
testks.varcharmaptable WHERE varcharkey= '᚛᚛ᚉᚑᚅ
ᚔᚉᚉᚔᚋ ᚔᚈᚔ ᚍᚂᚐᚅᚑ ᚅᚔᚋᚌᚓᚅᚐ᚜';" % 
map_name).encode("utf-8")))
             if encode_value:
-                got = {k.encode("utf-8"): v.encode("utf-8") for k, v in 
rows[0][0].iteritems()}
+                got = {k.encode("utf-8"): v.encode("utf-8") for k, v in 
rows[0][0].items()}
             else:
-                got = {k.encode("utf-8"): v for k, v in rows[0][0].iteritems()}
-            self.assertEqual(got, expected)
+                got = {k.encode("utf-8"): v for k, v in rows[0][0].items()}
+            assert got == expected
 
         verify_varcharmap('varcharasciimap', {
             'Vitrum edere possum, mihi non nocet.': 'Hello',
@@ -297,9 +300,9 @@ class TestCqlsh(Tester):
 
         output, err = self.run_cqlsh(node, 'use testks; SELECT * FROM 
varcharmaptable', ['--encoding=utf-8'])
 
-        self.assertEquals(output.count('Можам да јадам 
стакло, а не ме штета.'), 16)
-        self.assertEquals(output.count(' ⠊⠀⠉⠁⠝⠀⠑⠁⠞⠀⠛⠇â 
â Žâ Žâ €â â â ™â €â Šâ žâ €â ™â •⠑⠎⠝⠞⠀⠓⠥⠗⠞⠀⠍⠑'), 16)
-        self.assertEquals(output.count('᚛᚛ᚉᚑᚅ
ᚔᚉᚉᚔᚋ ᚔᚈᚔ ᚍᚂᚐᚅᚑ ᚅᚔᚋᚌᚓᚅᚐ᚜'), 2)
+        assert output.decode("utf-8").count('Можам да јадам 
стакло, а не ме штета.') == 16
+        assert output.decode("utf-8").count(' ⠊⠀⠉⠁⠝⠀⠑⠁⠞⠀â 
›â ‡â â Žâ Žâ €â â â ™â €â Šâ žâ €â ™â •⠑⠎⠝⠞⠀⠓⠥⠗⠞⠀⠍â 
‘') == 16
+        assert output.decode("utf-8").count('᚛᚛ᚉᚑᚅ
ᚔᚉᚉᚔᚋ ᚔᚈᚔ ᚍᚂᚐᚅᚑ ᚅᚔᚋᚌᚓᚅᚐ᚜') == 2
 
     def test_eat_glass(self):
 
@@ -308,7 +311,7 @@ class TestCqlsh(Tester):
 
         node1, = self.cluster.nodelist()
 
-        node1.run_cqlsh(cmds=u"""create KEYSPACE testks WITH replication = 
{'class': 'SimpleStrategy', 'replication_factor': 1};
+        node1.run_cqlsh(cmds="""create KEYSPACE testks WITH replication = 
{'class': 'SimpleStrategy', 'replication_factor': 1};
 use testks;
 
 CREATE TABLE varcharmaptable (
@@ -439,16 +442,15 @@ UPDATE varcharmaptable SET varcharvarintmap['Vitrum edere 
possum, mihi non nocet
         Ensure that syntax errors involving unicode are handled correctly.
         @jira_ticket CASSANDRA-11626
         """
-
         self.cluster.populate(1)
         self.cluster.start(wait_for_binary_proto=True)
 
         node1, = self.cluster.nodelist()
 
-        output, err, _ = node1.run_cqlsh(cmds=u"ä;".encode('utf8'))
+        output, err, _ = node1.run_cqlsh(cmds="ä;".encode('utf8'))
         err = err.decode('utf8')
-        self.assertIn(u'Invalid syntax', err)
-        self.assertIn(u'ä', err)
+        assert 'Invalid syntax' in err
+        assert 'ä' in err
 
     @since('2.2')
     def test_unicode_invalid_request_error(self):
@@ -461,12 +463,12 @@ UPDATE varcharmaptable SET varcharvarintmap['Vitrum edere 
possum, mihi non nocet
 
         node1, = self.cluster.nodelist()
 
-        cmd = u'''create keyspace "ä" WITH replication = {'class': 
'SimpleStrategy', 'replication_factor': '1'};'''
+        cmd = '''create keyspace "ä" WITH replication = {'class': 
'SimpleStrategy', 'replication_factor': '1'};'''
         cmd = cmd.encode('utf8')
         output, err, _ = node1.run_cqlsh(cmds=cmd, cqlsh_options=["--debug"])
 
         err = err.decode('utf8')
-        self.assertIn(u'"ä" is not a valid keyspace name', err)
+        assert '"ä" is not a valid keyspace name' in err
 
     def test_with_empty_values(self):
         """
@@ -477,7 +479,7 @@ UPDATE varcharmaptable SET varcharvarintmap['Vitrum edere 
possum, mihi non nocet
 
         node1, = self.cluster.nodelist()
 
-        node1.run_cqlsh(cmds=u"""create keyspace  CASSANDRA_7196 WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor': 1} ;
+        node1.run_cqlsh(cmds="""create keyspace  CASSANDRA_7196 WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor': 1} ;
 
 use CASSANDRA_7196;
 
@@ -536,7 +538,7 @@ VALUES (4, blobAsInt(0x), '', blobAsBigint(0x), 0x, 
blobAsBoolean(0x), blobAsDec
 
         output, err = self.run_cqlsh(node1, "select intcol, bigintcol, 
varintcol from CASSANDRA_7196.has_all_types where num in (0, 1, 2, 3, 4)")
         if common.is_win():
-            output = output.replace('\r', '')
+            output = output.decode("utf-8").replace('\r', '')
 
         expected = """
  intcol      | bigintcol            | varintcol
@@ -547,9 +549,9 @@ VALUES (4, blobAsInt(0x), '', blobAsBigint(0x), 0x, 
blobAsBoolean(0x), blobAsDec
  -2147483648 | -9223372036854775808 | -10000000000000000000000000
              |                      |                            \n\n(5 
rows)"""
 
-        self.assertTrue(expected in output, "Output \n {%s} \n doesn't contain 
expected\n {%s}" % (output, expected))
+        assert expected in output, "Output \n {%s} \n doesn't contain 
expected\n {%s}" % (output, expected)
 
-    def tracing_from_system_traces_test(self):
+    def test_tracing_from_system_traces(self):
         self.cluster.populate(1).start(wait_for_binary_proto=True)
 
         node1, = self.cluster.nodelist()
@@ -562,15 +564,15 @@ VALUES (4, blobAsInt(0x), '', blobAsBigint(0x), 0x, 
blobAsBoolean(0x), blobAsDec
         insert_c1c2(session, n=100)
 
         out, err = self.run_cqlsh(node1, 'TRACING ON; SELECT * FROM ks.cf')
-        self.assertIn('Tracing session: ', out)
+        assert 'Tracing session: ' in out
 
         out, err = self.run_cqlsh(node1, 'TRACING ON; SELECT * FROM 
system_traces.events')
-        self.assertNotIn('Tracing session: ', out)
+        assert 'Tracing session: ' not in out
 
         out, err = self.run_cqlsh(node1, 'TRACING ON; SELECT * FROM 
system_traces.sessions')
-        self.assertNotIn('Tracing session: ', out)
+        assert 'Tracing session: ' not in out
 
-    def select_element_inside_udt_test(self):
+    def test_select_element_inside_udt(self):
         self.cluster.populate(1).start()
 
         node1, = self.cluster.nodelist()
@@ -601,7 +603,7 @@ VALUES (4, blobAsInt(0x), '', blobAsBigint(0x), 0x, 
blobAsBoolean(0x), blobAsDec
             """)
 
         out, err = self.run_cqlsh(node1, "SELECT name.lastname FROM ks.users 
WHERE id=62c36092-82a1-3a00-93d1-46196ee77204")
-        self.assertNotIn('list index out of range', err)
+        assert 'list index out of range' not in err
         # If this assertion fails check CASSANDRA-7891
 
     def verify_output(self, query, node, expected):
@@ -609,10 +611,10 @@ VALUES (4, blobAsInt(0x), '', blobAsBigint(0x), 0x, 
blobAsBoolean(0x), blobAsDec
         if common.is_win():
             output = output.replace('\r', '')
 
-        self.assertEqual(len(err), 0, "Failed to execute cqlsh: 
{}".format(err))
+        assert len(err), 0 == "Failed to execute cqlsh: {}".format(err)
 
-        debug(output)
-        self.assertTrue(expected in output, "Output \n {%s} \n doesn't contain 
expected\n {%s}" % (output, expected))
+        logger.debug(output)
+        assert expected in output, "Output \n {%s} \n doesn't contain 
expected\n {%s}" % (output, expected)
 
     def test_list_queries(self):
         config = {'authenticator': 
'org.apache.cassandra.auth.PasswordAuthenticator',
@@ -695,8 +697,8 @@ VALUES (4, blobAsInt(0x), '', blobAsBigint(0x), 0x, 
blobAsBoolean(0x), blobAsDec
 
         # Describe keyspaces
         output = self.execute(cql="DESCRIBE KEYSPACES")
-        self.assertIn("test", output)
-        self.assertIn("system", output)
+        assert "test" in output
+        assert "system" in output
 
         # Describe keyspace
         self.execute(cql="DESCRIBE KEYSPACE test", 
expected_output=self.get_keyspace_output())
@@ -775,8 +777,8 @@ VALUES (4, blobAsInt(0x), '', blobAsBigint(0x), 0x, 
blobAsBoolean(0x), blobAsDec
                         "'min_threshold': 10, 'max_threshold': 100 }")
         describe_cmd = 'DESCRIBE ks.tab'
         stdout, _ = self.run_cqlsh(node, describe_cmd)
-        self.assertIn("'min_threshold': '10'", stdout)
-        self.assertIn("'max_threshold': '100'", stdout)
+        assert "'min_threshold': '10'" in stdout
+        assert "'max_threshold': '100'" in stdout
 
     def test_describe_on_non_reserved_keywords(self):
         """
@@ -791,8 +793,8 @@ VALUES (4, blobAsInt(0x), '', blobAsBigint(0x), 0x, 
blobAsBoolean(0x), blobAsDec
         session.execute("CREATE TABLE map (key int PRIMARY KEY, val text)")
         describe_cmd = 'USE ks; DESCRIBE map'
         out, err = self.run_cqlsh(node, describe_cmd)
-        self.assertEqual("", err)
-        self.assertIn("CREATE TABLE ks.map (", out)
+        assert "" == err
+        assert "CREATE TABLE ks.map (" in out
 
     @since('3.0')
     def test_describe_mv(self):
@@ -813,7 +815,7 @@ VALUES (4, blobAsInt(0x), '', blobAsBigint(0x), 0x, 
blobAsBoolean(0x), blobAsDec
                 """)
 
         output = self.execute(cql="DESCRIBE KEYSPACE test")
-        self.assertIn("users_by_state", output)
+        assert "users_by_state" in output
 
         self.execute(cql='DESCRIBE MATERIALIZED VIEW test.users_by_state', 
expected_output=self.get_users_by_state_mv_output())
         self.execute(cql='DESCRIBE test.users_by_state', 
expected_output=self.get_users_by_state_mv_output())
@@ -1045,7 +1047,7 @@ VALUES (4, blobAsInt(0x), '', blobAsBigint(0x), 0x, 
blobAsBoolean(0x), blobAsDec
                """
 
     def execute(self, cql, expected_output=None, expected_err=None, 
env_vars=None):
-        debug(cql)
+        logger.debug(cql)
         node1, = self.cluster.nodelist()
         output, err = self.run_cqlsh(node1, cql, env_vars=env_vars)
 
@@ -1055,7 +1057,7 @@ VALUES (4, blobAsInt(0x), '', blobAsBigint(0x), 0x, 
blobAsBoolean(0x), blobAsDec
                 self.check_response(err, expected_err)
                 return
             else:
-                self.assertTrue(False, err)
+                assert False, err
 
         if expected_output:
             self.check_response(output, expected_output)
@@ -1065,7 +1067,7 @@ VALUES (4, blobAsInt(0x), '', blobAsBigint(0x), 0x, 
blobAsBoolean(0x), blobAsDec
     def check_response(self, response, expected_response):
         lines = [s.strip() for s in response.split("\n") if s.strip()]
         expected_lines = [s.strip() for s in expected_response.split("\n") if 
s.strip()]
-        self.assertEqual(expected_lines, lines)
+        assert expected_lines == lines
 
     def test_copy_to(self):
         self.cluster.populate(1).start()
@@ -1089,20 +1091,20 @@ VALUES (4, blobAsInt(0x), '', blobAsBigint(0x), 0x, 
blobAsBoolean(0x), blobAsDec
         results = list(session.execute("SELECT * FROM testcopyto"))
 
         self.tempfile = NamedTemporaryFile(delete=False)
-        debug('Exporting to csv file: %s' % (self.tempfile.name,))
+        logger.debug('Exporting to csv file: %s' % (self.tempfile.name,))
         node1.run_cqlsh(cmds="COPY ks.testcopyto TO '%s'" % 
(self.tempfile.name,))
 
         # session
         with open(self.tempfile.name, 'r') as csvfile:
             csvreader = csv.reader(csvfile)
-            result_list = [map(str, cql_row) for cql_row in results]
-            self.assertItemsEqual(result_list, csvreader)
+            result_list = [list(map(str, cql_row)) for cql_row in results]
+            assert result_list == csvreader
 
         # import the CSV file with COPY FROM
         session.execute("TRUNCATE ks.testcopyto")
         node1.run_cqlsh(cmds="COPY ks.testcopyto FROM '%s'" % 
(self.tempfile.name,))
         new_results = list(session.execute("SELECT * FROM testcopyto"))
-        self.assertItemsEqual(results, new_results)
+        assert results == new_results
 
     def test_float_formatting(self):
         """ Tests for CASSANDRA-9224, check format of float and double 
values"""
@@ -1298,7 +1300,7 @@ VALUES (4, blobAsInt(0x), '', blobAsBigint(0x), 0x, 
blobAsBoolean(0x), blobAsDec
             INSERT INTO values (part, val1, val2, val3, val4) VALUES ('min', 
%d, %d, -32768, -128);
             INSERT INTO values (part, val1, val2, val3, val4) VALUES ('max', 
%d, %d, 32767, 127)""" % (-1 << 31, -1 << 63, (1 << 31) - 1, (1 << 63) - 1))
 
-        self.assertEqual(len(stderr), 0, "Failed to execute cqlsh: 
{}".format(stderr))
+        assert len(stderr), 0 == "Failed to execute cqlsh: {}".format(stderr)
 
         self.verify_output("select * from int_checks.values", node1, """
  part | val1        | val2                 | val3   | val4
@@ -1340,7 +1342,7 @@ CREATE TABLE int_checks.values (
                                         % (datetime.MINYEAR - 1, 
datetime.MINYEAR, datetime.MAXYEAR, datetime.MAXYEAR + 1,))
         # outside the MIN and MAX range it should print the number of days 
from the epoch
 
-        self.assertEqual(len(stderr), 0, "Failed to execute cqlsh: 
{}".format(stderr))
+        assert len(stderr), 0 == "Failed to execute cqlsh: {}".format(stderr)
 
         self.verify_output("select * from datetime_checks.values", node1, """
  d          | t
@@ -1381,7 +1383,7 @@ CREATE TABLE datetime_checks.values (
             INSERT INTO test (id, val) VALUES (2, 'lkjlk');
             INSERT INTO test (id, val) VALUES (3, 'iuiou')""")
 
-        self.assertEqual(len(stderr), 0, "Failed to execute cqlsh: 
{}".format(stderr))
+        assert len(stderr), 0 == "Failed to execute cqlsh: {}".format(stderr)
 
         self.verify_output("use tracing_checks; tracing on; select * from 
test", node1, """Now Tracing is enabled
 
@@ -1425,7 +1427,7 @@ Tracing session:""")
             USE client_warnings;
             CREATE TABLE test (id int, val text, PRIMARY KEY (id))""")
 
-        self.assertEqual(len(stderr), 0, "Failed to execute cqlsh: 
{}".format(stderr))
+        assert len(stderr), 0 == "Failed to execute cqlsh: {}".format(stderr)
 
         session = self.patient_cql_connection(node1)
         prepared = session.prepare("INSERT INTO client_warnings.test (id, val) 
VALUES (?, 'abc')")
@@ -1433,24 +1435,24 @@ Tracing session:""")
         batch_without_warning = BatchStatement(batch_type=BatchType.UNLOGGED)
         batch_with_warning = BatchStatement(batch_type=BatchType.UNLOGGED)
 
-        for i in xrange(max_partitions_per_batch + 1):
+        for i in range(max_partitions_per_batch + 1):
             batch_with_warning.add(prepared, (i,))
             if i < max_partitions_per_batch:
                 batch_without_warning.add(prepared, (i,))
 
         fut = session.execute_async(batch_without_warning)
         fut.result()  # wait for batch to complete before checking warnings
-        self.assertIsNone(fut.warnings)
+        assert fut.warnings is None
 
         fut = session.execute_async(batch_with_warning)
         fut.result()  # wait for batch to complete before checking warnings
-        debug(fut.warnings)
-        self.assertIsNotNone(fut.warnings)
-        self.assertEquals(1, len(fut.warnings))
-        self.assertEquals("Unlogged batch covering {} partitions detected 
against table [client_warnings.test]. "
-                          .format(max_partitions_per_batch + 1) +
-                          "You should use a logged batch for atomicity, or 
asynchronous writes for performance.",
-                          fut.warnings[0])
+        logger.debug(fut.warnings)
+        assert fut.warnings is not None
+        assert 1 == len(fut.warnings)
+        assert "Unlogged batch covering {} partitions detected against table 
[client_warnings.test]. "\
+                   .format(max_partitions_per_batch + 1) + "You should use a 
logged batch for atomicity, " \
+                                                           "or asynchronous 
writes for performance." \
+               == fut.warnings[0]
 
     def test_connect_timeout(self):
         """
@@ -1462,7 +1464,7 @@ Tracing session:""")
         node1, = self.cluster.nodelist()
 
         stdout, stderr = self.run_cqlsh(node1, cmds='USE system', 
cqlsh_options=['--debug', '--connect-timeout=10'])
-        self.assertTrue("Using connect timeout: 10 seconds" in stderr)
+        assert "Using connect timeout: 10 seconds" in stderr
 
     def test_update_schema_with_down_node(self):
         """
@@ -1481,12 +1483,12 @@ Tracing session:""")
         stdout, stderr = self.run_cqlsh(node1, cmds="""
               CREATE KEYSPACE training WITH 
replication={'class':'SimpleStrategy','replication_factor':1};
               DESCRIBE KEYSPACES""", cqlsh_options=cqlsh_opts)
-        self.assertIn("training", stdout)
+        assert "training" in stdout
 
         stdout, stderr = self.run_cqlsh(node1, """USE training;
                                                   CREATE TABLE mytable (id 
int, val text, PRIMARY KEY (id));
                                                   describe tables""", 
cqlsh_options=cqlsh_opts)
-        self.assertIn("mytable", stdout)
+        assert "mytable" in stdout
 
     def test_describe_round_trip(self):
         """
@@ -1514,7 +1516,7 @@ Tracing session:""")
 
         session.execute('DROP TABLE test_ks.lcs_describe')
 
-        create_statement = 'USE test_ks; ' + ' 
'.join(describe_out.splitlines())
+        create_statement = 'USE test_ks; ' + ' 
'.join(describe_out.decode("utf-8").splitlines())
         create_out, create_err = self.run_cqlsh(node1, create_statement)
 
         # these statements shouldn't fall down
@@ -1522,7 +1524,7 @@ Tracing session:""")
         session.execute('INSERT INTO lcs_describe (key) VALUES (1)')
 
         # the table created before and after should be the same
-        self.assertEqual(reloaded_describe_out, describe_out)
+        assert reloaded_describe_out.decode("utf-8") == 
describe_out.decode("utf-8")
 
     @since('3.0')
     def test_materialized_view(self):
@@ -1550,31 +1552,39 @@ Tracing session:""")
         session.execute(insert_stmt + "('user4', 'ch@ngem3d', 'm', 'TX', 
1974);")
 
         describe_out, err = self.run_cqlsh(node1, 'DESCRIBE MATERIALIZED VIEW 
test.users_by_state')
-        self.assertEqual(0, len(err), err)
+        describe_out_str = describe_out.decode("utf-8")
+        err_str = err.decode("utf-8")
+        assert 0 == len(err_str), err_str
 
         select_out, err = self.run_cqlsh(node1, "SELECT * FROM 
test.users_by_state")
-        self.assertEqual(0, len(err), err)
-        debug(select_out)
+        err_str = err.decode("utf-8")
+        assert 0 == len(err_str), err_str
+        logger.debug(select_out)
 
         out, err = self.run_cqlsh(node1, "DROP MATERIALIZED VIEW 
test.users_by_state; DESCRIBE KEYSPACE test; DESCRIBE table test.users")
-        self.assertEqual(0, len(err), err)
-        self.assertNotIn("CREATE MATERIALIZED VIEW users_by_state", out)
+        err_str = err.decode("utf-8")
+        assert 0 == len(err_str), err_str
+        assert "CREATE MATERIALIZED VIEW users_by_state" not in out
 
         out, err = self.run_cqlsh(node1, 'DESCRIBE MATERIALIZED VIEW 
test.users_by_state')
-        self.assertEqual(0, len(out.strip()), out)
-        self.assertIn("Materialized view 'users_by_state' not found", err)
+        describe_out_str = describe_out.decode("utf-8")
+        assert 0 == len(describe_out_str.strip()), describe_out_str
+        assert "Materialized view 'users_by_state' not found" in err
 
-        create_statement = 'USE test; ' + ' 
'.join(describe_out.splitlines()).strip()[:-1]
+        create_statement = 'USE test; ' + ' 
'.join(describe_out_str.splitlines()).strip()[:-1]
         out, err = self.run_cqlsh(node1, create_statement)
-        self.assertEqual(0, len(err), err)
+        err_str = err.decode("utf-8")
+        assert 0 == len(err_str), err_str
 
         reloaded_describe_out, err = self.run_cqlsh(node1, 'DESCRIBE 
MATERIALIZED VIEW test.users_by_state')
-        self.assertEqual(0, len(err), err)
-        self.assertEqual(describe_out, reloaded_describe_out)
+        err_str = err.decode("utf-8")
+        assert 0 == len(err_str), err_str
+        assert describe_out_str == reloaded_describe_out
 
         reloaded_select_out, err = self.run_cqlsh(node1, "SELECT * FROM 
test.users_by_state")
-        self.assertEqual(0, len(err), err)
-        self.assertEqual(select_out, reloaded_select_out)
+        err_str = err.decode("utf-8")
+        assert 0 == len(err_str), err_str
+        assert select_out == reloaded_select_out
 
     @since('3.0')
     def test_clear(self):
@@ -1615,11 +1625,11 @@ Tracing session:""")
         node1, = self.cluster.nodelist()
 
         out, err = self.run_cqlsh(node1, cmd, env_vars={'TERM': 'xterm'})
-        self.assertEqual("", err)
+        assert "" == err
 
         # Can't check escape sequence on cmd prompt. Assume no errors is good 
enough metric.
         if not common.is_win():
-            self.assertTrue(re.search(chr(27) + "\[[0,1,2]?J", out))
+            assert re.search(chr(27) + "\[[0,1,2]?J", out)
 
     def test_batch(self):
         """
@@ -1635,8 +1645,8 @@ Tracing session:""")
             CREATE TABLE excelsior.data (id int primary key);
             BEGIN BATCH INSERT INTO excelsior.data (id) VALUES (0); APPLY 
BATCH""")
 
-        self.assertEqual(0, len(stderr), stderr)
-        self.assertEqual(0, len(stdout), stdout)
+        assert 0 == len(stderr), stderr
+        assert 0 == len(stdout), stdout
 
     def run_cqlsh(self, node, cmds, cqlsh_options=None, env_vars=None):
         if env_vars is None:
@@ -1663,13 +1673,13 @@ Tracing session:""")
         return p.communicate()
 
 
-class CqlshSmokeTest(Tester):
+class TestCqlshSmoke(Tester):
     """
     Tests simple use cases for clqsh.
     """
 
-    def setUp(self):
-        super(CqlshSmokeTest, self).setUp()
+    @pytest.fixture(scope='function', autouse=True)
+    def fixture_cluster_setup(self, fixture_dtest_setup):
         self.cluster.populate(1).start(wait_for_binary_proto=True)
         [self.node1] = self.cluster.nodelist()
         self.session = self.patient_cql_connection(self.node1)
@@ -1682,22 +1692,22 @@ class CqlshSmokeTest(Tester):
         create_cf(self.session, 'test', key_type='uuid', columns={'i': 'int'})
 
         out, err, _ = self.node1.run_cqlsh("INSERT INTO ks.test (key) VALUES 
(uuid())")
-        self.assertEqual(err, "")
+        assert err == ""
 
         result = list(self.session.execute("SELECT key FROM ks.test"))
-        self.assertEqual(len(result), 1)
-        self.assertEqual(len(result[0]), 1)
-        self.assertIsInstance(result[0][0], UUID)
+        assert len(result) == 1
+        assert len(result[0]) == 1
+        assert isinstance(result[0][0], UUID)
 
         out, err, _ = self.node1.run_cqlsh("INSERT INTO ks.test (key) VALUES 
(uuid())")
-        self.assertEqual(err, "")
+        assert err == ""
 
         result = list(self.session.execute("SELECT key FROM ks.test"))
-        self.assertEqual(len(result), 2)
-        self.assertEqual(len(result[0]), 1)
-        self.assertEqual(len(result[1]), 1)
-        self.assertIsInstance(result[0][0], UUID)
-        self.assertIsInstance(result[1][0], UUID)
+        assert len(result) == 2
+        assert len(result[0]) == 1
+        assert len(result[1]) == 1
+        assert isinstance(result[0][0], UUID)
+        assert isinstance(result[1][0], UUID)
         self.assertNotEqual(result[0][0], result[1][0])
 
     def test_commented_lines(self):
@@ -1713,8 +1723,8 @@ class CqlshSmokeTest(Tester):
              * comment */
             """)
         out, err, _ = self.node1.run_cqlsh("DESCRIBE KEYSPACE ks; // post-line 
comment")
-        self.assertEqual(err, "")
-        self.assertTrue(out.strip().startswith("CREATE KEYSPACE ks"))
+        assert err == ""
+        assert out.strip().startswith("CREATE KEYSPACE ks")
 
     def test_colons_in_string_literals(self):
         create_ks(self.session, 'ks', 1)
@@ -1725,7 +1735,7 @@ class CqlshSmokeTest(Tester):
             INSERT INTO ks.test (key) VALUES ('Cassandra:TheMovie');
             """)
         assert_all(self.session, "SELECT key FROM test",
-                   [[u'Cassandra:TheMovie']])
+                   [['Cassandra:TheMovie']])
 
     def test_select(self):
         create_ks(self.session, 'ks', 1)
@@ -1733,31 +1743,31 @@ class CqlshSmokeTest(Tester):
 
         self.session.execute("INSERT INTO ks.test (key, c, v) VALUES ('a', 
'a', 'a')")
         assert_all(self.session, "SELECT key, c, v FROM test",
-                   [[u'a', u'a', u'a']])
+                   [['a', 'a', 'a']])
 
         out, err, _ = self.node1.run_cqlsh("SELECT key, c, v FROM ks.test")
         out_lines = [x.strip() for x in out.split("\n")]
 
         # there should be only 1 row returned & it should contain the inserted 
values
-        self.assertIn("(1 rows)", out_lines)
-        self.assertIn("a | a | a", out_lines)
-        self.assertEqual(err, '')
+        assert "(1 rows)" in out_lines
+        assert "a | a | a" in out_lines
+        assert err == ''
 
     def test_insert(self):
         create_ks(self.session, 'ks', 1)
         create_cf(self.session, 'test')
 
         self.node1.run_cqlsh("INSERT INTO ks.test (key, c, v) VALUES ('a', 
'a', 'a')")
-        assert_all(self.session, "SELECT key, c, v FROM test", [[u"a", u"a", 
u"a"]])
+        assert_all(self.session, "SELECT key, c, v FROM test", [["a", "a", 
"a"]])
 
     def test_update(self):
         create_ks(self.session, 'ks', 1)
         create_cf(self.session, 'test')
 
         self.session.execute("INSERT INTO test (key, c, v) VALUES ('a', 'a', 
'a')")
-        assert_all(self.session, "SELECT key, c, v FROM test", [[u"a", u"a", 
u"a"]])
+        assert_all(self.session, "SELECT key, c, v FROM test", [["a", "a", 
"a"]])
         self.node1.run_cqlsh("UPDATE ks.test SET v = 'b' WHERE key = 'a' AND c 
= 'a'")
-        assert_all(self.session, "SELECT key, c, v FROM test", [[u"a", u"a", 
u"b"]])
+        assert_all(self.session, "SELECT key, c, v FROM test", [["a", "a", 
"b"]])
 
     def test_delete(self):
         create_ks(self.session, 'ks', 1)
@@ -1769,12 +1779,12 @@ class CqlshSmokeTest(Tester):
         self.session.execute("INSERT INTO test (key) VALUES ('d')")
         self.session.execute("INSERT INTO test (key) VALUES ('e')")
         assert_all(self.session, 'SELECT key from test',
-                   [[u'a'], [u'c'], [u'e'], [u'd'], [u'b']])
+                   [['a'], ['c'], ['e'], ['d'], ['b']])
 
         self.node1.run_cqlsh("DELETE FROM ks.test WHERE key = 'c'")
 
         assert_all(self.session, 'SELECT key from test',
-                   [[u'a'], [u'e'], [u'd'], [u'b']])
+                   [['a'], ['e'], ['d'], ['b']])
 
     def test_batch(self):
         create_ks(self.session, 'ks', 1)
@@ -1790,27 +1800,27 @@ class CqlshSmokeTest(Tester):
             ''')
         # make sure everything inserted is actually there
         assert_all(self.session, 'SELECT key FROM ks.test',
-                   [[u'eggs'], [u'spam'], [u'sausage']])
+                   [['eggs'], ['spam'], ['sausage']])
 
     def test_create_keyspace(self):
-        self.assertNotIn(u'created', self.get_keyspace_names())
+        assert 'created' not in self.get_keyspace_names()
 
         self.node1.run_cqlsh("CREATE KEYSPACE created WITH replication = { 
'class' : 'SimpleStrategy', 'replication_factor' : 1 }")
-        self.assertIn(u'created', self.get_keyspace_names())
+        assert 'created' in self.get_keyspace_names()
 
     def test_drop_keyspace(self):
         create_ks(self.session, 'ks', 1)
-        self.assertIn(u'ks', self.get_keyspace_names())
+        assert 'ks' in self.get_keyspace_names()
 
         self.node1.run_cqlsh('DROP KEYSPACE ks')
 
-        self.assertNotIn(u'ks', self.get_keyspace_names())
+        assert 'ks' not in self.get_keyspace_names()
 
     def test_create_table(self):
         create_ks(self.session, 'ks', 1)
 
         self.node1.run_cqlsh('CREATE TABLE ks.test (i int PRIMARY KEY);')
-        self.assertEquals(self.get_tables_in_keyspace('ks'), [u'test'])
+        assert self.get_tables_in_keyspace('ks') == ['test']
 
     def test_drop_table(self):
         create_ks(self.session, 'ks', 1)
@@ -1821,7 +1831,7 @@ class CqlshSmokeTest(Tester):
         self.node1.run_cqlsh('DROP TABLE ks.test;')
         self.session.cluster.refresh_schema_metadata()
 
-        self.assertEqual(0, 
len(self.session.cluster.metadata.keyspaces['ks'].tables))
+        assert 0 == len(self.session.cluster.metadata.keyspaces['ks'].tables)
 
     def test_truncate(self):
         create_ks(self.session, 'ks', 1)
@@ -1833,10 +1843,10 @@ class CqlshSmokeTest(Tester):
         self.session.execute("INSERT INTO test (key) VALUES ('d')")
         self.session.execute("INSERT INTO test (key) VALUES ('e')")
         assert_all(self.session, 'SELECT key from test',
-                   [[u'a'], [u'c'], [u'e'], [u'd'], [u'b']])
+                   [['a'], ['c'], ['e'], ['d'], ['b']])
 
         self.node1.run_cqlsh('TRUNCATE ks.test;')
-        self.assertEqual([], rows_to_list(self.session.execute('SELECT * from 
test')))
+        assert [] == rows_to_list(self.session.execute('SELECT * from test'))
 
     @since('2.0', max_version='2.2')
     def test_alter_table(self):
@@ -1846,20 +1856,18 @@ class CqlshSmokeTest(Tester):
         def get_ks_columns():
             table = 
self.session.cluster.metadata.keyspaces['ks'].tables['test']
 
-            return [[table.name, column.name, column.cql_type] for column in 
table.columns.values()]
+            return [[table.name, column.name, column.cql_type] for column in 
list(table.columns.values())]
 
-        old_column_spec = [u'test', u'i',
-                           u'ascii']
-        self.assertIn(old_column_spec, get_ks_columns())
+        old_column_spec = ['test', 'i',
+                           'ascii']
+        assert old_column_spec in get_ks_columns()
 
         self.node1.run_cqlsh('ALTER TABLE ks.test ALTER i TYPE text;')
         self.session.cluster.refresh_table_metadata("ks", "test")
 
         new_columns = get_ks_columns()
-        self.assertNotIn(old_column_spec, new_columns)
-        self.assertIn([u'test', u'i',
-                       u'text'],
-                      new_columns)
+        assert old_column_spec not in new_columns
+        assert ['test', 'i', 'text'] in new_columns
 
     def test_use_keyspace(self):
         # ks1 contains ks1table, ks2 contains ks2table
@@ -1873,16 +1881,16 @@ class CqlshSmokeTest(Tester):
             USE ks1;
             DESCRIBE TABLES;
             ''')
-        self.assertEqual([x for x in ks1_stdout.split() if x], ['ks1table'])
-        self.assertEqual(ks1_stderr, '')
+        assert [x for x in ks1_stdout.split() if x] == ['ks1table']
+        assert ks1_stderr == ''
 
         ks2_stdout, ks2_stderr, _ = self.node1.run_cqlsh(
             '''
             USE ks2;
             DESCRIBE TABLES;
             ''')
-        self.assertEqual([x for x in ks2_stdout.split() if x], ['ks2table'])
-        self.assertEqual(ks2_stderr, '')
+        assert [x for x in ks2_stdout.split() if x] == ['ks2table']
+        assert ks2_stderr == ''
 
     # DROP INDEX statement fails in 2.0 (see CASSANDRA-9247)
     def test_drop_index(self):
@@ -1896,7 +1904,7 @@ class CqlshSmokeTest(Tester):
             return self.session.execute(requires_index)
 
         # make sure it fails as expected
-        self.assertRaises(InvalidRequest, execute_requires_index)
+        pytest.raises(InvalidRequest, execute_requires_index)
 
         # make sure it doesn't fail when an index exists
         self.session.execute('CREATE INDEX index_to_drop ON test (i);')
@@ -1904,7 +1912,7 @@ class CqlshSmokeTest(Tester):
 
         # drop the index via cqlsh, then make sure it fails
         self.node1.run_cqlsh('DROP INDEX ks.index_to_drop;')
-        self.assertRaises(InvalidRequest, execute_requires_index)
+        pytest.raises(InvalidRequest, execute_requires_index)
 
     # DROP INDEX statement fails in 2.0 (see CASSANDRA-9247)
     def test_create_index(self):
@@ -1918,7 +1926,7 @@ class CqlshSmokeTest(Tester):
             return self.session.execute(requires_index)
 
         # make sure it fails as expected
-        self.assertRaises(InvalidRequest, execute_requires_index)
+        pytest.raises(InvalidRequest, execute_requires_index)
 
         # make sure index exists after creating via cqlsh
         self.node1.run_cqlsh('CREATE INDEX index_to_drop ON ks.test (i);')
@@ -1926,15 +1934,15 @@ class CqlshSmokeTest(Tester):
 
         # drop the index, then make sure it fails again
         self.session.execute('DROP INDEX ks.index_to_drop;')
-        self.assertRaises(InvalidRequest, execute_requires_index)
+        pytest.raises(InvalidRequest, execute_requires_index)
 
     def get_keyspace_names(self):
         self.session.cluster.refresh_schema_metadata()
-        return [ks.name for ks in 
self.session.cluster.metadata.keyspaces.values()]
+        return [ks.name for ks in 
list(self.session.cluster.metadata.keyspaces.values())]
 
     def get_tables_in_keyspace(self, keyspace):
         self.session.cluster.refresh_schema_metadata()
-        return [table.name for table in 
self.session.cluster.metadata.keyspaces[keyspace].tables.values()]
+        return [table.name for table in 
list(self.session.cluster.metadata.keyspaces[keyspace].tables.values())]
 
 
 class CqlLoginTest(Tester):
@@ -1942,12 +1950,13 @@ class CqlLoginTest(Tester):
     Tests login which requires password authenticator
     """
 
-    def setUp(self):
-        super(CqlLoginTest, self).setUp()
+    @pytest.fixture(scope='function', autouse=True)
+    def fixture_cluster_setup(self, fixture_dtest_setup):
+        cluster = fixture_dtest_setup.cluster
         config = {'authenticator': 
'org.apache.cassandra.auth.PasswordAuthenticator'}
-        self.cluster.set_configuration_options(values=config)
-        self.cluster.populate(1).start(wait_for_binary_proto=True)
-        [self.node1] = self.cluster.nodelist()
+        cluster.set_configuration_options(values=config)
+        cluster.populate(1).start(wait_for_binary_proto=True)
+        [self.node1] = cluster.nodelist()
         self.node1.watch_log_for('Created default superuser')
         self.session = self.patient_cql_connection(self.node1, 
user='cassandra', password='cassandra')
 
@@ -1956,7 +1965,7 @@ class CqlLoginTest(Tester):
                    if self.cluster.version() >= LooseVersion('3.10')
                    else "Username and/or password are incorrect")
 
-        self.assertEqual([message in x for x in input.split("\n") if x], 
[True])
+        assert [message in x for x in input.split("\n") if x] == [True]
 
     def test_login_keeps_keyspace(self):
         create_ks(self.session, 'ks1', 1)
@@ -1971,8 +1980,8 @@ class CqlLoginTest(Tester):
             DESCRIBE TABLES;
             ''',
             cqlsh_options=['-u', 'cassandra', '-p', 'cassandra'])
-        self.assertEqual([x for x in cqlsh_stdout.split() if x], ['ks1table', 
'ks1table'])
-        self.assertEqual(cqlsh_stderr, '')
+        assert [x for x in cqlsh_stdout.split() if x], ['ks1table' == 
'ks1table']
+        assert cqlsh_stderr == ''
 
     def test_login_rejects_bad_pass(self):
         create_ks(self.session, 'ks1', 1)
@@ -2009,7 +2018,7 @@ class CqlLoginTest(Tester):
             query,
             cqlsh_options=['-u', 'cassandra', '-p', 'cassandra'])
 
-        err_lines = cqlsh_stderr.splitlines()
+        err_lines = str(cqlsh_stderr).splitlines()
         for err_line in err_lines:
             if expected_error in err_line:
                 break
@@ -2030,7 +2039,7 @@ class CqlLoginTest(Tester):
             DESCRIBE TABLES;
             ''',
             cqlsh_options=['-u', 'cassandra', '-p', 'cassandra'])
-        self.assertEqual([x for x in cqlsh_stdout.split() if x], ['ks1table'])
+        assert [x for x in cqlsh_stdout.split() if x] == ['ks1table']
         self.assert_login_not_allowed('user1', cqlsh_stderr)
 
     @since('2.2')
@@ -2047,5 +2056,5 @@ class CqlLoginTest(Tester):
             LIST ROLES;
             ''',
             cqlsh_options=['-u', 'cassandra', '-p', 'cassandra'])
-        self.assertTrue('super' in out)
-        self.assertEqual('', err)
+        assert 'super' in out
+        assert '' == err

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/cqlsh_tests/cqlsh_tools.py
----------------------------------------------------------------------
diff --git a/cqlsh_tests/cqlsh_tools.py b/cqlsh_tests/cqlsh_tools.py
index 6a592a0..d7aca97 100644
--- a/cqlsh_tests/cqlsh_tools.py
+++ b/cqlsh_tests/cqlsh_tools.py
@@ -2,7 +2,6 @@ import csv
 import random
 
 import cassandra
-from nose.tools import assert_items_equal
 
 
 class DummyColorMap(object):
@@ -25,7 +24,7 @@ def csv_rows(filename, delimiter=None):
 
 def assert_csvs_items_equal(filename1, filename2):
     with open(filename1, 'r') as x, open(filename2, 'r') as y:
-        assert_items_equal(list(x.readlines()), list(y.readlines()))
+        assert list(x.readlines()) == list(y.readlines())
 
 
 def random_list(gen=None, n=None):

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/delete_insert_test.py
----------------------------------------------------------------------
diff --git a/delete_insert_test.py b/delete_insert_test.py
index efc5927..e0c4bbc 100644
--- a/delete_insert_test.py
+++ b/delete_insert_test.py
@@ -1,21 +1,22 @@
 import random
 import threading
 import uuid
+import logging
 
 from cassandra import ConsistencyLevel
 from cassandra.query import SimpleStatement
-from nose.tools import assert_equal
 
 from dtest import Tester, create_ks
 
+logger = logging.getLogger(__name__)
 
-class DeleteInsertTest(Tester):
+
+class TestDeleteInsert(Tester):
     """
     Examines scenarios around deleting data and adding data back with the same 
key
     """
     # Generate 1000 rows in memory so we can re-use the same ones over again:
-    groups = ['group1', 'group2', 'group3', 'group4']
-    rows = [(str(uuid.uuid1()), x, random.choice(groups)) for x in range(1000)]
+    rows = [(str(uuid.uuid1()), x, random.choice(['group1', 'group2', 
'group3', 'group4'])) for x in range(1000)]
 
     def create_ddl(self, session, rf={'dc1': 2, 'dc2': 2}):
         create_ks(session, 'delete_insert_search_test', rf)
@@ -36,7 +37,7 @@ class DeleteInsertTest(Tester):
         for row in rows:
             session.execute("INSERT INTO test (id, val1, group) VALUES (%s, 
'%s', '%s')" % row)
 
-    def delete_insert_search_test(self):
+    def test_delete_insert_search(self):
         cluster = self.cluster
         cluster.populate([2, 2]).start(wait_for_binary_proto=True)
         node1 = cluster.nodelist()[0]
@@ -63,9 +64,11 @@ class DeleteInsertTest(Tester):
 
             def run(self):
                 session = self.connection
-                query = SimpleStatement("SELECT * FROM 
delete_insert_search_test.test WHERE group = 'group2'", 
consistency_level=ConsistencyLevel.LOCAL_QUORUM)
+                query = SimpleStatement("SELECT * FROM 
delete_insert_search_test.test WHERE group = 'group2'",
+                                        
consistency_level=ConsistencyLevel.LOCAL_QUORUM)
                 rows = session.execute(query)
-                assert_equal(len(list(rows)), len(deleted), "Expecting the 
length of {} to be equal to the length of {}.".format(list(rows), deleted))
+                assert len(list(rows)) == len(deleted), "Expecting the length 
of {} to be equal to the " \
+                                                        "length of 
{}.".format(list(rows), deleted)
 
         threads = []
         for x in range(20):

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/deletion_test.py
----------------------------------------------------------------------
diff --git a/deletion_test.py b/deletion_test.py
index d4258ad..9c832b3 100644
--- a/deletion_test.py
+++ b/deletion_test.py
@@ -1,14 +1,17 @@
 import time
+import logging
 
 from dtest import Tester, create_ks, create_cf
 from tools.data import rows_to_list
 from tools.jmxutils import (JolokiaAgent, make_mbean,
                             remove_perf_disable_shared_mem)
 
+logger = logging.getLogger(__name__)
+
 
 class TestDeletion(Tester):
 
-    def gc_test(self):
+    def test_gc(self):
         """
         Test that tombstone purging doesn't bring back deleted data by writing
         2 rows to a table with gc_grace=0, deleting one of those rows, then
@@ -29,23 +32,20 @@ class TestDeletion(Tester):
         session.execute('insert into cf (key, c1) values (2,1)')
         node1.flush()
 
-        self.assertEqual(rows_to_list(session.execute('select * from cf;')),
-                         [[1, 1], [2, 1]])
+        assert rows_to_list(session.execute('select * from cf;')) == [[1, 1], 
[2, 1]]
 
         session.execute('delete from cf where key=1')
 
-        self.assertEqual(rows_to_list(session.execute('select * from cf;')),
-                         [[2, 1]])
+        assert rows_to_list(session.execute('select * from cf;')) == [[2, 1]]
 
         node1.flush()
         time.sleep(.5)
         node1.compact()
         time.sleep(.5)
 
-        self.assertEqual(rows_to_list(session.execute('select * from cf;')),
-                         [[2, 1]])
+        assert rows_to_list(session.execute('select * from cf;')) == [[2, 1]]
 
-    def tombstone_size_test(self):
+    def test_tombstone_size(self):
         self.cluster.populate(1)
         node1 = self.cluster.nodelist()[0]
 
@@ -61,8 +61,8 @@ class TestDeletion(Tester):
         for i in range(100):
             session.execute(stmt, [i])
 
-        self.assertEqual(memtable_count(node1, 'ks', 'test'), 100)
-        self.assertGreater(memtable_size(node1, 'ks', 'test'), 0)
+        assert memtable_count(node1, 'ks', 'test') == 100
+        assert memtable_size(node1, 'ks', 'test') > 0
 
 
 def memtable_size(node, keyspace, table):

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/disk_balance_test.py
----------------------------------------------------------------------
diff --git a/disk_balance_test.py b/disk_balance_test.py
index 9eed377..a32ced9 100644
--- a/disk_balance_test.py
+++ b/disk_balance_test.py
@@ -2,16 +2,21 @@ import os
 import os.path
 import re
 
+import pytest
+import logging
+
 from ccmlib.node import Node
-from dtest import DISABLE_VNODES, Tester, create_ks, debug
+from dtest import Tester, create_ks
 from tools.assertions import assert_almost_equal
 from tools.data import create_c1c2_table, insert_c1c2, query_c1c2
-from tools.decorators import since
 from tools.jmxutils import (JolokiaAgent, make_mbean,
                             remove_perf_disable_shared_mem)
 from tools.misc import new_node
 from compaction_test import grep_sstables_in_each_level
 
+since = pytest.mark.since
+logger = logging.getLogger(__name__)
+
 
 @since('3.2')
 class TestDiskBalance(Tester):
@@ -19,62 +24,63 @@ class TestDiskBalance(Tester):
     @jira_ticket CASSANDRA-6696
     """
 
-    def disk_balance_stress_test(self):
+    def test_disk_balance_stress(self):
         cluster = self.cluster
-        if not DISABLE_VNODES:
+        if self.dtest_config.use_vnodes:
             cluster.set_configuration_options(values={'num_tokens': 256})
         cluster.populate(4).start(wait_for_binary_proto=True)
         node1 = cluster.nodes['node1']
 
-        node1.stress(['write', 'n=50k', 'no-warmup', '-rate', 'threads=100', 
'-schema', 'replication(factor=3)', 
'compaction(strategy=SizeTieredCompactionStrategy,enabled=false)'])
+        node1.stress(['write', 'n=50k', 'no-warmup', '-rate', 'threads=100', 
'-schema', 'replication(factor=3)',
+                      
'compaction(strategy=SizeTieredCompactionStrategy,enabled=false)'])
         cluster.flush()
         # make sure the data directories are balanced:
         for node in cluster.nodelist():
             self.assert_balanced(node)
 
-    def disk_balance_bootstrap_test(self):
+    def test_disk_balance_bootstrap(self):
         cluster = self.cluster
-        if not DISABLE_VNODES:
+        if self.dtest_config.use_vnodes:
             cluster.set_configuration_options(values={'num_tokens': 256})
         # apparently we have legitimate errors in the log when bootstrapping 
(see bootstrap_test.py)
-        self.allow_log_errors = True
+        self.fixture_dtest_setup.allow_log_errors = True
         cluster.populate(4).start(wait_for_binary_proto=True)
         node1 = cluster.nodes['node1']
 
-        node1.stress(['write', 'n=50k', 'no-warmup', '-rate', 'threads=100', 
'-schema', 'replication(factor=3)', 
'compaction(strategy=SizeTieredCompactionStrategy,enabled=false)'])
+        node1.stress(['write', 'n=50k', 'no-warmup', '-rate', 'threads=100', 
'-schema', 'replication(factor=3)',
+                      
'compaction(strategy=SizeTieredCompactionStrategy,enabled=false)'])
         cluster.flush()
         node5 = new_node(cluster)
         node5.start(wait_for_binary_proto=True)
         self.assert_balanced(node5)
 
-
-    def disk_balance_replace_same_address_test(self):
+    def test_disk_balance_replace_same_address(self):
         self._test_disk_balance_replace(same_address=True)
 
-    def disk_balance_replace_different_address_test(self):
+    def test_disk_balance_replace_different_address(self):
         self._test_disk_balance_replace(same_address=False)
 
     def _test_disk_balance_replace(self, same_address):
-        debug("Creating cluster")
+        logger.debug("Creating cluster")
         cluster = self.cluster
-        if not DISABLE_VNODES:
+        if self.dtest_config.use_vnodes:
             cluster.set_configuration_options(values={'num_tokens': 256})
         # apparently we have legitimate errors in the log when bootstrapping 
(see bootstrap_test.py)
-        self.allow_log_errors = True
+        self.fixture_dtest_setup.allow_log_errors = True
         cluster.populate(4).start(wait_for_binary_proto=True)
         node1 = cluster.nodes['node1']
 
-        debug("Populating")
+        logger.debug("Populating")
         node1.stress(['write', 'n=50k', 'no-warmup', '-rate', 'threads=100', 
'-schema', 'replication(factor=3)', 
'compaction(strategy=SizeTieredCompactionStrategy,enabled=false)'])
         cluster.flush()
 
-        debug("Stopping and removing node2")
+        logger.debug("Stopping and removing node2")
         node2 = cluster.nodes['node2']
         node2.stop(gently=False)
         self.cluster.remove(node2)
 
         node5_address = node2.address() if same_address else '127.0.0.5'
-        debug("Starting replacement node")
+        logger.debug("Starting replacement node")
         node5 = Node('node5', cluster=self.cluster, auto_bootstrap=True,
                      thrift_interface=None, storage_interface=(node5_address, 
7000),
                      jmx_port='7500', remote_debug_port='0', 
initial_token=None,
@@ -84,17 +90,18 @@ class TestDiskBalance(Tester):
                     wait_for_binary_proto=True,
                     wait_other_notice=True)
 
-        debug("Checking replacement node is balanced")
+        logger.debug("Checking replacement node is balanced")
         self.assert_balanced(node5)
 
-    def disk_balance_decommission_test(self):
+    def test_disk_balance_decommission(self):
         cluster = self.cluster
-        if not DISABLE_VNODES:
+        if self.dtest_config.use_vnodes:
             cluster.set_configuration_options(values={'num_tokens': 256})
         cluster.populate(4).start(wait_for_binary_proto=True)
         node1 = cluster.nodes['node1']
         node4 = cluster.nodes['node4']
-        node1.stress(['write', 'n=50k', 'no-warmup', '-rate', 'threads=100', 
'-schema', 'replication(factor=2)', 
'compaction(strategy=SizeTieredCompactionStrategy,enabled=false)'])
+        node1.stress(['write', 'n=50k', 'no-warmup', '-rate', 'threads=100', 
'-schema', 'replication(factor=2)',
+                      
'compaction(strategy=SizeTieredCompactionStrategy,enabled=false)'])
         cluster.flush()
 
         node4.decommission()
@@ -105,7 +112,7 @@ class TestDiskBalance(Tester):
         for node in cluster.nodelist():
             self.assert_balanced(node)
 
-    def blacklisted_directory_test(self):
+    def test_blacklisted_directory(self):
         cluster = self.cluster
         cluster.set_datadir_count(3)
         cluster.populate(1)
@@ -118,7 +125,7 @@ class TestDiskBalance(Tester):
         create_c1c2_table(self, session)
         insert_c1c2(session, n=10000)
         node.flush()
-        for k in xrange(0, 10000):
+        for k in range(0, 10000):
             query_c1c2(session, k)
 
         node.compact()
@@ -126,17 +133,17 @@ class TestDiskBalance(Tester):
         with JolokiaAgent(node) as jmx:
             jmx.execute_method(mbean, 'markUnwritable', 
[os.path.join(node.get_path(), 'data0')])
 
-        for k in xrange(0, 10000):
+        for k in range(0, 10000):
             query_c1c2(session, k)
 
         node.nodetool('relocatesstables')
 
-        for k in xrange(0, 10000):
+        for k in range(0, 10000):
             query_c1c2(session, k)
 
-    def alter_replication_factor_test(self):
+    def test_alter_replication_factor(self):
         cluster = self.cluster
-        if not DISABLE_VNODES:
+        if self.dtest_config.use_vnodes:
             cluster.set_configuration_options(values={'num_tokens': 256})
         cluster.populate(3).start(wait_for_binary_proto=True)
         node1 = cluster.nodes['node1']
@@ -159,14 +166,14 @@ class TestDiskBalance(Tester):
         assert_almost_equal(*sums, error=0.1, error_message=node.name)
 
     @since('3.10')
-    def disk_balance_after_boundary_change_stcs_test(self):
+    def test_disk_balance_after_boundary_change_stcs(self):
         """
         @jira_ticket CASSANDRA-13948
         """
         self._disk_balance_after_boundary_change_test(lcs=False)
 
     @since('3.10')
-    def disk_balance_after_boundary_change_lcs_test(self):
+    def test_disk_balance_after_boundary_change_lcs(self):
         """
         @jira_ticket CASSANDRA-13948
         """
@@ -184,13 +191,13 @@ class TestDiskBalance(Tester):
         """
 
         cluster = self.cluster
-        if not DISABLE_VNODES:
+        if self.dtest_config.use_vnodes:
             cluster.set_configuration_options(values={'num_tokens': 1024})
         num_disks = 5
         cluster.set_datadir_count(num_disks)
         cluster.set_configuration_options(values={'concurrent_compactors': 
num_disks})
 
-        debug("Starting node1 with {} data dirs and 
concurrent_compactors".format(num_disks))
+        logger.debug("Starting node1 with {} data dirs and 
concurrent_compactors".format(num_disks))
         cluster.populate(1).start(wait_for_binary_proto=True)
         [node1] = cluster.nodelist()
 
@@ -204,13 +211,13 @@ class TestDiskBalance(Tester):
         keys_to_write = num_flushes * keys_per_flush
 
         compaction_opts = "LeveledCompactionStrategy,sstable_size_in_mb=1" if 
lcs else "SizeTieredCompactionStrategy"
-        debug("Writing {} keys in {} flushes 
(compaction_opts={})".format(keys_to_write, num_flushes, compaction_opts))
+        logger.debug("Writing {} keys in {} flushes 
(compaction_opts={})".format(keys_to_write, num_flushes, compaction_opts))
         total_keys = num_flushes * keys_per_flush
         current_keys = 0
         while current_keys < total_keys:
             start_key = current_keys + 1
             end_key = current_keys + keys_per_flush
-            debug("Writing keys {}..{} and flushing".format(start_key, 
end_key))
+            logger.debug("Writing keys {}..{} and flushing".format(start_key, 
end_key))
             node1.stress(['write', 'n={}'.format(keys_per_flush), "no-warmup", 
"cl=ALL", "-pop",
                           "seq={}..{}".format(start_key, end_key), "-rate", 
"threads=1", "-schema", "replication(factor=1)",
                           
"compaction(strategy={},enabled=false)".format(compaction_opts)])
@@ -218,28 +225,28 @@ class TestDiskBalance(Tester):
             current_keys = end_key
 
         # Add a new node, so disk boundaries will change
-        debug("Bootstrap node2 and flush")
+        logger.debug("Bootstrap node2 and flush")
         node2 = new_node(cluster, bootstrap=True)
         node2.start(wait_for_binary_proto=True, 
jvm_args=["-Dcassandra.migration_task_wait_in_seconds=10"], 
set_migration_task=False)
         node2.flush()
 
         self._assert_balanced_after_boundary_change(node1, total_keys, lcs)
 
-        debug("Decommissioning node1")
+        logger.debug("Decommissioning node1")
         node1.decommission()
         node1.stop()
 
         self._assert_balanced_after_boundary_change(node2, total_keys, lcs)
 
     @since('3.10')
-    def disk_balance_after_joining_ring_stcs_test(self):
+    def test_disk_balance_after_joining_ring_stcs(self):
         """
         @jira_ticket CASSANDRA-13948
         """
         self._disk_balance_after_joining_ring_test(lcs=False)
 
     @since('3.10')
-    def disk_balance_after_joining_ring_lcs_test(self):
+    def test_disk_balance_after_joining_ring_lcs(self):
         """
         @jira_ticket CASSANDRA-13948
         """
@@ -257,13 +264,13 @@ class TestDiskBalance(Tester):
         """
 
         cluster = self.cluster
-        if not DISABLE_VNODES:
+        if self.dtest_config.use_vnodes:
             cluster.set_configuration_options(values={'num_tokens': 1024})
         num_disks = 5
         cluster.set_datadir_count(num_disks)
         cluster.set_configuration_options(values={'concurrent_compactors': 
num_disks})
 
-        debug("Starting 3 nodes with {} data dirs and 
concurrent_compactors".format(num_disks))
+        logger.debug("Starting 3 nodes with {} data dirs and 
concurrent_compactors".format(num_disks))
         cluster.populate(3).start(wait_for_binary_proto=True)
         node1 = cluster.nodelist()[0]
 
@@ -272,63 +279,63 @@ class TestDiskBalance(Tester):
         keys_to_write = num_flushes * keys_per_flush
 
         compaction_opts = "LeveledCompactionStrategy,sstable_size_in_mb=1" if 
lcs else "SizeTieredCompactionStrategy"
-        debug("Writing {} keys in {} flushes 
(compaction_opts={})".format(keys_to_write, num_flushes, compaction_opts))
+        logger.debug("Writing {} keys in {} flushes 
(compaction_opts={})".format(keys_to_write, num_flushes, compaction_opts))
         total_keys = num_flushes * keys_per_flush
         current_keys = 0
         while current_keys < total_keys:
             start_key = current_keys + 1
             end_key = current_keys + keys_per_flush
-            debug("Writing keys {}..{} and flushing".format(start_key, 
end_key))
+            logger.debug("Writing keys {}..{} and flushing".format(start_key, 
end_key))
             node1.stress(['write', 'n={}'.format(keys_per_flush), "no-warmup", 
"cl=ALL", "-pop",
                           "seq={}..{}".format(start_key, end_key), "-rate", 
"threads=1", "-schema", "replication(factor=1)",
                           
"compaction(strategy={},enabled=false)".format(compaction_opts)])
             node1.nodetool('flush keyspace1 standard1')
             current_keys = end_key
 
-        debug("Stopping node1")
+        logger.debug("Stopping node1")
         node1.stop()
 
-        debug("Starting node1 without joining ring")
+        logger.debug("Starting node1 without joining ring")
         node1.start(wait_for_binary_proto=True, wait_other_notice=False, 
join_ring=False,
                     jvm_args=["-Dcassandra.load_ring_state=false", 
"-Dcassandra.write_survey=true"])
 
-        debug("Joining node1 to the ring")
+        logger.debug("Joining node1 to the ring")
         node1.nodetool("join")
         node1.nodetool("join")  # Need to run join twice - one to join ring, 
another to leave write survey mode
 
         self._assert_balanced_after_boundary_change(node1, total_keys, lcs)
 
     def _assert_balanced_after_boundary_change(self, node, total_keys, lcs):
-        debug("Cleanup {}".format(node.name))
+        logger.debug("Cleanup {}".format(node.name))
         node.cleanup()
 
-        debug("Enabling compactions on {} now that boundaries 
changed".format(node.name))
+        logger.debug("Enabling compactions on {} now that boundaries 
changed".format(node.name))
         node.nodetool('enableautocompaction')
 
-        debug("Waiting for compactions on {}".format(node.name))
+        logger.debug("Waiting for compactions on {}".format(node.name))
         node.wait_for_compactions()
 
-        debug("Disabling compactions on {} should not block 
forever".format(node.name))
+        logger.debug("Disabling compactions on {} should not block 
forever".format(node.name))
         node.nodetool('disableautocompaction')
 
-        debug("Major compact {} and check disks are 
balanced".format(node.name))
+        logger.debug("Major compact {} and check disks are 
balanced".format(node.name))
         node.compact()
 
         node.wait_for_compactions()
         self.assert_balanced(node)
 
-        debug("Reading data back ({} keys)".format(total_keys))
+        logger.debug("Reading data back ({} keys)".format(total_keys))
         node.stress(['read', 'n={}'.format(total_keys), "no-warmup", "cl=ALL", 
"-pop", "seq=1...{}".format(total_keys), "-rate", "threads=1"])
 
         if lcs:
             output = grep_sstables_in_each_level(node, "standard1")
-            debug("SSTables in each level: {}".format(output))
+            logger.debug("SSTables in each level: {}".format(output))
 
             # [0, ?/, 0, 0, 0, 0...]
             p = re.compile(r'(\d+)(/\d+)?,\s(\d+).*')
             m = p.search(output)
             cs_count = int(m.group(1)) + int(m.group(3))
             sstable_count = len(node.get_sstables('keyspace1', 'standard1'))
-            debug("Checking that compaction strategy sstable # ({}) is equal 
to actual # ({})".format(cs_count, sstable_count))
-            self.assertEqual(sstable_count, cs_count)
-            self.assertFalse(node.grep_log("is already present on leveled 
manifest"))
+            logger.debug("Checking that compaction strategy sstable # ({}) is 
equal to actual # ({})".format(cs_count, sstable_count))
+            assert sstable_count == cs_count
+            assert not node.grep_log("is already present on leveled manifest")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to