Repository: cassandra-dtest
Updated Branches:
  refs/heads/master 4e1c05565 -> 0d9c98ee1


Transient Replication and Cheap Quorums tests

Patch by Blake Eggleston, Alex Petrov, Ariel Weisberg; Reviewed by Blake 
Eggleston for CASSANDRA-14404

Co-authored-by: Blake Eggleston <[email protected]>
Co-authored-by: Ariel Weisberg <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/cassandra-dtest/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra-dtest/commit/0d9c98ee
Tree: http://git-wip-us.apache.org/repos/asf/cassandra-dtest/tree/0d9c98ee
Diff: http://git-wip-us.apache.org/repos/asf/cassandra-dtest/diff/0d9c98ee

Branch: refs/heads/master
Commit: 0d9c98ee1ec006604e4f8f1787f7be5b5792cf78
Parents: 4e1c055
Author: Alex Petrov <[email protected]>
Authored: Fri Sep 14 14:32:31 2018 +0200
Committer: Alex Petrov <[email protected]>
Committed: Mon Sep 17 17:29:20 2018 +0200

----------------------------------------------------------------------
 transient_replication_ring_test.py | 502 ++++++++++++++++++++++++
 transient_replication_test.py      | 653 ++++++++++++++++++++++++++++++++
 2 files changed, 1155 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/0d9c98ee/transient_replication_ring_test.py
----------------------------------------------------------------------
diff --git a/transient_replication_ring_test.py 
b/transient_replication_ring_test.py
new file mode 100644
index 0000000..a3b596e
--- /dev/null
+++ b/transient_replication_ring_test.py
@@ -0,0 +1,502 @@
+import logging
+import types
+
+from cassandra import ConsistencyLevel
+from cassandra.query import SimpleStatement
+from ccmlib.node import Node
+
+from dtest import Tester
+from tools.assertions import (assert_all)
+
+from flaky import flaky
+
+from cassandra.metadata import BytesToken, OrderedDict
+import pytest
+from itertools import chain
+from tools.misc import new_node
+
+logging.getLogger('cassandra').setLevel(logging.CRITICAL)
+
+NODELOCAL = 11
+
+def jmx_start(to_start, **kwargs):
+    kwargs['jvm_args'] = kwargs.get('jvm_args', []) + 
['-XX:-PerfDisableSharedMem']
+    to_start.start(**kwargs)
+
+
+def gen_expected(*values):
+    return [["%05d" % i, i, i] for i in chain(*values)]
+
+
+def repair_nodes(nodes):
+    for node in nodes:
+        node.nodetool('repair -pr')
+
+def cleanup_nodes(nodes):
+    for node in nodes:
+        node.nodetool('cleanup')
+
+def patch_start(startable):
+    old_start = startable.start
+
+    def new_start(self, *args, **kwargs):
+        kwargs['jvm_args'] = kwargs.get('jvm_args', []) + 
['-XX:-PerfDisableSharedMem'
+                                                           ' 
-Dcassandra.enable_nodelocal_queries=true']
+        return old_start(*args, **kwargs)
+
+    startable.start = types.MethodType(new_start, startable)
+
+class TestTransientReplicationRing(Tester):
+
+    keyspace = "ks"
+    table = "tbl"
+
+    def select(self):
+        return "SELECT * from %s.%s" % (self.keyspace, self.table)
+
+    def select_statement(self):
+        return SimpleStatement(self.select(), consistency_level=NODELOCAL)
+
+    def point_select(self):
+        return "SELECT * from %s.%s where pk = %%s" % (self.keyspace, 
self.table)
+
+    def point_select_statement(self):
+        return SimpleStatement(self.point_select(), 
consistency_level=NODELOCAL)
+
+    def check_expected(self, sessions, expected, node=[i for i in 
range(0,1000)], cleanup=False):
+        """Check that each node has the expected values present"""
+        for idx, session, expect, node in zip(range(0, 1000), sessions, 
expected, node):
+            print("Checking idx " + str(idx))
+            print(str([row for row in 
session.execute(self.select_statement())]))
+            if cleanup:
+                node.nodetool('cleanup')
+            assert_all(session,
+                       self.select(),
+                       expect,
+                       cl=NODELOCAL)
+
+    def check_replication(self, sessions, exactly=None, gte=None, lte=None):
+        """Assert that the test values are replicated a required number of 
times"""
+        for i in range(0, 40):
+            count = 0
+            for session in sessions:
+                for row in session.execute(self.point_select_statement(), 
["%05d" % i]):
+                    count += 1
+            if exactly:
+                assert count == exactly, "Wrong replication for %05d should be 
exactly" % (i, exactly)
+            if gte:
+                assert count >= gte, "Count for %05d should be >= %d" % (i, 
gte)
+            if lte:
+                assert count <= lte, "Count for %05d should be <= %d" % (i, 
lte)
+
+    @pytest.fixture
+    def cheap_quorums(self):
+        return False
+
+    @pytest.fixture(scope='function', autouse=True)
+    def setup_cluster(self, fixture_dtest_setup):
+        self.tokens = ['00010', '00020', '00030']
+
+        patch_start(self.cluster)
+        
self.cluster.set_configuration_options(values={'hinted_handoff_enabled': False,
+                                                       'num_tokens': 1,
+                                                       
'commitlog_sync_period_in_ms': 500,
+                                                       
'enable_transient_replication': True,
+                                                       'partitioner' : 
'org.apache.cassandra.dht.OrderPreservingPartitioner'})
+        print("CLUSTER INSTALL DIR: ")
+        print(self.cluster.get_install_dir())
+        self.cluster.populate(3, tokens=self.tokens, debug=True, 
install_byteman=True)
+        # self.cluster.populate(3, debug=True, install_byteman=True)
+        self.cluster.start(wait_other_notice=True, wait_for_binary_proto=True, 
jvm_args=['-Dcassandra.enable_nodelocal_queries=true'])
+
+        # enable shared memory
+        for node in self.cluster.nodelist():
+            patch_start(node)
+            print(node.logfilename())
+
+        self.nodes = self.cluster.nodelist()
+        self.node1, self.node2, self.node3 = self.nodes
+        session = self.exclusive_cql_connection(self.node3)
+
+        replication_params = OrderedDict()
+        replication_params['class'] = 'NetworkTopologyStrategy'
+        replication_params['datacenter1'] = '3/1'
+        replication_params = ', '.join("'%s': '%s'" % (k, v) for k, v in 
replication_params.items())
+
+        session.execute("CREATE KEYSPACE %s WITH REPLICATION={%s}" % 
(self.keyspace, replication_params))
+        print("CREATE KEYSPACE %s WITH REPLICATION={%s}" % (self.keyspace, 
replication_params))
+        self.create_table(session)
+
+    def create_table(self, session, never_speculate=False):
+        if never_speculate:
+            session.execute("CREATE TABLE %s.%s (pk varchar, ck int, value 
int, PRIMARY KEY (pk, ck)) WITH speculative_retry = 'NEVER' AND read_repair = 
'NONE'" % (self.keyspace, self.table))
+        else:
+            session.execute("CREATE TABLE %s.%s (pk varchar, ck int, value 
int, PRIMARY KEY (pk, ck)) WITH read_repair = 'NONE'" % (self.keyspace, 
self.table))
+        print(str(self.node1.run_cqlsh("describe table %s.%s" % 
(self.keyspace, self.table))))
+
+    def quorum(self, session, stmt_str):
+        return session.execute(SimpleStatement(stmt_str, 
consistency_level=ConsistencyLevel.QUORUM))
+
+    def insert_row(self, pk, ck, value, session=None, node=None):
+        session = session or self.exclusive_cql_connection(node or self.node1)
+        #token = BytesToken.from_key(pack('>i', pk)).value
+        #assert token < BytesToken.from_string(self.tokens[0]).value or 
BytesToken.from_string(self.tokens[-1]).value < token   # primary replica 
should be node1
+        #TODO Is quorum really right? I mean maybe we want ALL with retries 
since we really don't want to the data
+        #not at a replica unless it is intentional
+        self.quorum(session, "INSERT INTO %s.%s (pk, ck, value) VALUES 
('%05d', %s, %s)" % (self.keyspace, self.table, pk, ck, value))
+
+    @flaky(max_runs=1)
+    @pytest.mark.no_vnodes
+    def test_bootstrap_and_cleanup(self):
+        """Test bootstrapping a new node across a mix of repaired and 
unrepaired data"""
+        main_session = self.patient_cql_connection(self.node1)
+        self.table = 'tbl2'
+        self.create_table(main_session, never_speculate=True)
+        nodes = [self.node1, self.node2, self.node3]
+
+        for i in range(0, 40, 2):
+            self.insert_row(i, i, i, main_session)
+
+        sessions = [self.exclusive_cql_connection(node) for node in 
[self.node1, self.node2, self.node3]]
+
+        expected = [gen_expected(range(0, 11, 2), range(22, 40, 2)),
+                    gen_expected(range(0, 22, 2), range(32, 40, 2)),
+                    gen_expected(range(12, 31, 2))]
+        self.check_expected(sessions, expected)
+
+        #Make sure at least a little data is repaired, this shouldn't move 
data anywhere
+        repair_nodes(nodes)
+
+        self.check_expected(sessions, expected)
+
+        #Ensure that there is at least some transient data around, because of 
this if it's missing after bootstrap
+        #We know we failed to get it from the transient replica losing the 
range entirely
+        nodes[1].stop(wait_other_notice=True)
+
+        for i in range(1, 40, 2):
+            self.insert_row(i, i, i, main_session)
+
+        nodes[1].start(wait_for_binary_proto=True, wait_other_notice=True)
+
+        sessions = [self.exclusive_cql_connection(node) for node in 
[self.node1, self.node2, self.node3]]
+
+        expected = [gen_expected(range(0, 11), range(11, 20, 2), range(21, 
40)),
+                    gen_expected(range(0, 21, 2), range(32, 40, 2)),
+                    gen_expected(range(1, 11, 2), range(11, 31), range(31, 40, 
2))]
+
+        #Every node should have some of its fully replicated data and one and 
two should have some transient data
+        self.check_expected(sessions, expected)
+
+        node4 = new_node(self.cluster, bootstrap=True, token='00040')
+        patch_start(node4)
+        nodes.append(node4)
+        node4.start(wait_for_binary_proto=True, wait_other_notice=True)
+
+        expected.append(gen_expected(range(11, 20, 2), range(21, 40)))
+        sessions.append(self.exclusive_cql_connection(node4))
+
+        #Because repair was never run and nodes had transient data it will 
have data for transient ranges (node1, 11-20)
+        assert_all(sessions[3],
+                   self.select(),
+                   expected[3],
+                   cl=NODELOCAL)
+
+        #Node1 no longer transiently replicates 11-20, so cleanup will clean 
it up
+        #Node1 also now transiently replicates 21-30 and half the values in 
that range were repaired
+        expected[0] = gen_expected(range(0, 11), range(21, 30, 2), range(31, 
40))
+        #Node2 still missing data since it was down during some insertions, it 
also lost some range (31-40)
+        expected[1] = gen_expected(range(0, 21, 2))
+        expected[2] = gen_expected(range(1, 11, 2), range(11, 31))
+
+        #Cleanup should only impact if a node lost a range entirely or started 
to transiently replicate it and the data
+        #was repaired
+        self.check_expected(sessions, expected, nodes, cleanup=True)
+
+        repair_nodes(nodes)
+
+        expected = [gen_expected(range(0, 11), range(31, 40)),
+                    gen_expected(range(0, 21)),
+                    gen_expected(range(11, 31)),
+                    gen_expected(range(21, 40))]
+
+        self.check_expected(sessions, expected, nodes, cleanup=True)
+
+        #Every value should be replicated exactly 2 times
+        self.check_replication(sessions, exactly=2)
+
+    @flaky(max_runs=1)
+    @pytest.mark.no_vnodes
+    def move_test(self, move_token, expected_after_move, 
expected_after_repair):
+        """Helper method to run a move test cycle"""
+        node4 = new_node(self.cluster, bootstrap=True, token='00040')
+        patch_start(node4)
+        node4.start(wait_for_binary_proto=True, wait_other_notice=True)
+        main_session = self.patient_cql_connection(self.node1)
+        self.table = 'tbl2'
+        self.create_table(main_session, never_speculate=True)
+        nodes = [self.node1, self.node2, self.node3, node4]
+
+        for i in range(0, 40, 2):
+            print("Inserting " + str(i))
+            self.insert_row(i, i, i, main_session)
+
+        # Make sure at least a little data is repaired
+        repair_nodes(nodes)
+
+        # Ensure that there is at least some transient data around, because of 
this if it's missing after bootstrap
+        # We know we failed to get it from the transient replica losing the 
range entirely
+        nodes[1].stop(wait_other_notice=True)
+
+        for i in range(1, 40, 2):
+            print("Inserting " + str(i))
+            self.insert_row(i, i, i, main_session)
+
+        nodes[1].start(wait_for_binary_proto=True, wait_other_notice=True)
+        sessions = [self.exclusive_cql_connection(node) for node in 
[self.node1, self.node2, self.node3, node4]]
+
+        expected = [gen_expected(range(0, 11), range(31, 40)),
+                    gen_expected(range(0, 21, 2)),
+                    gen_expected(range(1, 11, 2), range(11, 31)),
+                    gen_expected(range(11, 20, 2), range(21, 40))]
+        self.check_expected(sessions, expected)
+        self.check_replication(sessions, exactly=2)
+
+        nodes[0].nodetool('move %s' % move_token)
+        cleanup_nodes(nodes)
+
+        self.check_replication(sessions, gte=2, lte=3)
+        self.check_expected(sessions, expected=expected_after_move)
+
+        repair_nodes(nodes)
+
+        self.check_expected(sessions, expected_after_repair, nodes, 
cleanup=True)
+        self.check_replication(sessions, exactly=2)
+
+
+    @flaky(max_runs=1)
+    @pytest.mark.no_vnodes
+    def test_move_forwards_between_and_cleanup(self):
+        """Test moving a node forwards past a neighbor token"""
+        move_token = '00025'
+        expected_after_move = [gen_expected(range(0, 26), range(31, 40, 2)),
+                               gen_expected(range(0, 21, 2), range(31, 40)),
+                               gen_expected(range(1, 11, 2), range(11, 21, 2), 
range(21,31)),
+                               gen_expected(range(21, 26, 2), range(26, 40))]
+        expected_after_repair = [gen_expected(range(0, 26)),
+                                 gen_expected(range(0, 21), range(31, 40)),
+                                 gen_expected(range(21, 31),),
+                                 gen_expected(range(26, 40))]
+        self.move_test(move_token, expected_after_move, expected_after_repair)
+
+
+    @flaky(max_runs=1)
+    @pytest.mark.no_vnodes
+    def test_move_forwards_and_cleanup(self):
+        """Test moving a node forwards without going past a neighbor token"""
+        move_token = '00015'
+        expected_after_move = [gen_expected(range(0, 16), range(31, 40)),
+                               gen_expected(range(0, 21, 2)),
+                               gen_expected(range(1, 16, 2), range(16, 31)),
+                               gen_expected(range(17, 20, 2), range(21, 40))]
+        expected_after_repair = [gen_expected(range(0, 16), range(31, 40)),
+                                 gen_expected(range(0, 21)),
+                                 gen_expected(range(16, 31)),
+                                 gen_expected(range(21, 40))]
+        self.move_test(move_token, expected_after_move, expected_after_repair)
+
+
+    @flaky(max_runs=1)
+    @pytest.mark.no_vnodes
+    def test_move_backwards_between_and_cleanup(self):
+        """Test moving a node backwards past it's preceding neighbor's token"""
+        move_token = '00035'
+        expected_after_move = [gen_expected(range(1, 21, 2), range(21, 36)),
+                               gen_expected(range(0, 21, 2), range(36, 40)),
+                               gen_expected(range(0, 31), range(37, 40, 2)),
+                               gen_expected(range(21, 30, 2), range(31, 40))]
+        expected_after_repair = [gen_expected(range(21, 36)),
+                                 gen_expected(range(0, 21), range(36, 40)),
+                                 gen_expected(range(0, 31)),
+                                 gen_expected(range(31, 40))]
+        self.move_test(move_token, expected_after_move, expected_after_repair)
+
+
+    @flaky(max_runs=1)
+    @pytest.mark.no_vnodes
+    def test_move_backwards_and_cleanup(self):
+        """Test moving a node backwards without moving past a neighbor token"""
+        move_token = '00005'
+        expected_after_move = [gen_expected(range(0, 6), range(31, 40)),
+                               gen_expected(range(0, 21, 2)),
+                               gen_expected(range(1, 6, 2), range(6, 31)),
+                               gen_expected(range(7, 20, 2), range(21, 40))]
+        expected_after_repair = [gen_expected(range(0, 6), range(31, 40)),
+                                 gen_expected(range(0, 21)),
+                                 gen_expected(range(6, 31)),
+                                 gen_expected(range(21, 40))]
+        self.move_test(move_token, expected_after_move, expected_after_repair)
+
+
+    @flaky(max_runs=1)
+    @pytest.mark.no_vnodes
+    def test_decommission(self):
+        """Test decommissioning a node correctly streams out all the data"""
+        node4 = new_node(self.cluster, bootstrap=True, token='00040')
+        patch_start(node4)
+        node4.start(wait_for_binary_proto=True, wait_other_notice=True)
+        main_session = self.patient_cql_connection(self.node1)
+        self.table = 'tbl2'
+        self.create_table(main_session, never_speculate=True)
+        nodes = [self.node1, self.node2, self.node3, node4]
+
+        for i in range(0, 40, 2):
+            print("Inserting " + str(i))
+            self.insert_row(i, i, i, main_session)
+
+        # Make sure at least a little data is repaired
+        repair_nodes(nodes)
+
+        # Ensure that there is at least some transient data around, because of 
this if it's missing after bootstrap
+        # We know we failed to get it from the transient replica losing the 
range entirely
+        nodes[1].stop(wait_other_notice=True)
+
+        for i in range(1, 40, 2):
+            print("Inserting " + str(i))
+            self.insert_row(i, i, i, main_session)
+
+        nodes[1].start(wait_for_binary_proto=True, wait_other_notice=True)
+        sessions = [self.exclusive_cql_connection(node) for node in 
[self.node1, self.node2, self.node3, node4]]
+
+        expected = [gen_expected(range(0, 11), range(31, 40)),
+                    gen_expected(range(0, 21, 2)),
+                    gen_expected(range(1, 11, 2), range(11, 31)),
+                    gen_expected(range(11, 20, 2), range(21, 40))]
+
+        self.check_expected(sessions, expected)
+
+        #node1 has transient data we want to see streamed out on move
+        nodes[3].nodetool('decommission')
+
+        nodes = nodes[:-1]
+        sessions = sessions[:-1]
+
+        expected = [gen_expected(range(0, 11), range(11, 21, 2), range(21, 
40)),
+                    gen_expected(range(0, 21, 2), range(21, 30, 2), range(31, 
40)),
+                    gen_expected(range(1, 11, 2), range(11, 31), range(31, 40, 
2))]
+
+        cleanup_nodes(nodes)
+
+        self.check_replication(sessions, gte=2, lte=3)
+        self.check_expected(sessions, expected)
+
+        repair_nodes(nodes)
+
+        #There should be no transient data anywhere
+        expected = [gen_expected(range(0, 11), range(21, 40)),
+                    gen_expected(range(0, 21), range(31, 40)),
+                    gen_expected(range(11, 31))]
+
+        self.check_expected(sessions, expected, nodes, cleanup=True)
+        self.check_replication(sessions, exactly=2)
+
+
+    @flaky(max_runs=1)
+    @pytest.mark.no_vnodes
+    def test_remove(self):
+        """Test  a mix of ring change operations across a mix of transient and 
repaired/unrepaired data"""
+        node4 = new_node(self.cluster, bootstrap=True, token='00040')
+        patch_start(node4)
+        node4.start(wait_for_binary_proto=True, wait_other_notice=True)
+        main_session = self.patient_cql_connection(self.node1)
+        self.table = 'tbl2'
+        self.create_table(main_session, never_speculate=True)
+        nodes = [self.node1, self.node2, self.node3]
+
+        #We want the node being removed to have no data on it so nodetool 
remove always gets all the necessary data
+        #from survivors
+        node4_id = node4.nodetool('info').stdout[25:61]
+        node4.stop(wait_other_notice=True)
+
+        for i in range(0, 40):
+            print("Inserting " + str(i))
+            self.insert_row(i, i, i, main_session)
+
+        sessions = [self.exclusive_cql_connection(node) for node in 
[self.node1, self.node2, self.node3]]
+
+        expected = [gen_expected(range(0, 11), range(21, 40)),
+                    gen_expected(range(0, 21), range(31, 40)),
+                    gen_expected(range(11, 31))]
+
+        # Every node should some of its fully replicated data and one and two 
should have some transient data
+        self.check_expected(sessions, expected)
+
+        nodes[0].nodetool('removenode ' + node4_id)
+
+        #Give streaming time to occur, it's asynchronous from removenode 
completing at other ndoes
+        import time
+        time.sleep(15)
+
+        # Everyone should have everything except
+        expected = [gen_expected(range(0, 40)),
+                    gen_expected(range(0, 40)),
+                    gen_expected(range(0,40))]
+
+        self.check_replication(sessions, exactly=3)
+        self.check_expected(sessions, expected)
+        repair_nodes(nodes)
+        cleanup_nodes(nodes)
+
+        self.check_replication(sessions, exactly=2)
+
+        expected = [gen_expected(range(0,11), range(21,40)),
+                    gen_expected(range(0,21), range(31, 40)),
+                    gen_expected(range(11,31))]
+        self.check_expected(sessions, expected)
+
+    @flaky(max_runs=1)
+    @pytest.mark.no_vnodes
+    def test_replace(self):
+        main_session = self.patient_cql_connection(self.node1)
+        self.table = 'tbl2'
+        self.create_table(main_session, never_speculate=True)
+
+        #We want the node being replaced to have no data on it so the 
replacement definitely fetches all the data
+        self.node2.stop(wait_other_notice=True)
+
+        for i in range(0, 40):
+            print("Inserting " + str(i))
+            self.insert_row(i, i, i, main_session)
+
+        replacement_address = self.node2.address()
+        self.node2.stop(wait_other_notice=True)
+        self.cluster.remove(self.node2)
+        self.node2 = Node('replacement', cluster=self.cluster, 
auto_bootstrap=True,
+                                         thrift_interface=None, 
storage_interface=(replacement_address, 7000),
+                                         jmx_port='7400', 
remote_debug_port='0', initial_token=None, 
binary_interface=(replacement_address, 9042))
+        patch_start(self.node2)
+        nodes = [self.node1, self.node2, self.node3]
+        self.cluster.add(self.node2, False, data_center='datacenter1')
+        jvm_args = ["-Dcassandra.replace_address=%s" % replacement_address,
+                    "-Dcassandra.ring_delay_ms=10000",
+                    "-Dcassandra.broadcast_interval_ms=10000"]
+        self.node2.start(jvm_args=jvm_args, wait_for_binary_proto=True, 
wait_other_notice=True)
+
+        sessions = [self.exclusive_cql_connection(node) for node in 
[self.node1, self.node2, self.node3]]
+
+        # Everyone should have everything
+        expected = [gen_expected(range(0, 40)),
+                    gen_expected(range(0, 40)),
+                    gen_expected(range(0,40))]
+
+        self.check_replication(sessions, exactly=3)
+        self.check_expected(sessions, expected)
+
+        repair_nodes(nodes)
+        cleanup_nodes(nodes)
+
+        self.check_replication(sessions, exactly=2)
+
+        expected = [gen_expected(range(0,11), range(21,40)),
+                    gen_expected(range(0,21), range(31, 40)),
+                    gen_expected(range(11,31))]
+        self.check_expected(sessions, expected)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/0d9c98ee/transient_replication_test.py
----------------------------------------------------------------------
diff --git a/transient_replication_test.py b/transient_replication_test.py
new file mode 100644
index 0000000..0571678
--- /dev/null
+++ b/transient_replication_test.py
@@ -0,0 +1,653 @@
+import re
+import logging
+import types
+from struct import pack
+from uuid import UUID
+
+from cassandra import ConsistencyLevel, InvalidRequest
+from cassandra.query import SimpleStatement
+from cassandra.protocol import ConfigurationException
+from ccmlib.node import Node
+
+from dtest import Tester
+from tools.misc import ImmutableMapping
+from tools.jmxutils import JolokiaAgent, make_mbean
+from tools.data import rows_to_list
+from tools.assertions import (assert_all, assert_invalid, assert_length_equal,
+                              assert_none, assert_one, assert_unavailable)
+
+from cassandra.metadata import Murmur3Token, OrderedDict
+import pytest
+
+
+logging.getLogger('cassandra').setLevel(logging.CRITICAL)
+
+NODELOCAL = 11
+class SSTable(object):
+
+    def __init__(self, name, repaired, pending_id):
+        self.name = name
+        self.repaired = repaired
+        self.pending_id = pending_id
+
+
+class TableMetrics(object):
+
+    def __init__(self, node, keyspace, table):
+        assert isinstance(node, Node)
+        self.jmx = JolokiaAgent(node)
+        self.write_latency_mbean = make_mbean("metrics", type="Table", 
name="WriteLatency", keyspace=keyspace, scope=table)
+        self.speculative_reads_mbean = make_mbean("metrics", type="Table", 
name="SpeculativeRetries", keyspace=keyspace, scope=table)
+        self.transient_writes_mbean = make_mbean("metrics", type="Table", 
name="TransientWrites", keyspace=keyspace, scope=table)
+
+    @property
+    def write_count(self):
+        return self.jmx.read_attribute(self.write_latency_mbean, "Count")
+
+    @property
+    def speculative_reads(self):
+        return self.jmx.read_attribute(self.speculative_reads_mbean, "Count")
+
+    @property
+    def transient_writes(self):
+        return self.jmx.read_attribute(self.transient_writes_mbean, "Count")
+
+    def start(self):
+        self.jmx.start()
+
+    def stop(self):
+        self.jmx.stop()
+
+    def __enter__(self):
+        """ For contextmanager-style usage. """
+        self.start()
+        return self
+
+    def __exit__(self, exc_type, value, traceback):
+        """ For contextmanager-style usage. """
+        self.stop()
+
+
+class StorageProxy(object):
+
+    def __init__(self, node):
+        assert isinstance(node, Node)
+        self.node = node
+        self.jmx = JolokiaAgent(node)
+        self.mbean = make_mbean("db", type="StorageProxy")
+
+    def start(self):
+        self.jmx.start()
+
+    def stop(self):
+        self.jmx.stop()
+
+    @property
+    def blocking_read_repair(self):
+        return self.jmx.read_attribute(self.mbean, 
"ReadRepairRepairedBlocking")
+
+    @property
+    def speculated_data_request(self):
+        return self.jmx.read_attribute(self.mbean, 
"ReadRepairSpeculatedRequest")
+
+    @property
+    def speculated_data_repair(self):
+        return self.jmx.read_attribute(self.mbean, 
"ReadRepairSpeculatedRepair")
+
+    def __enter__(self):
+        """ For contextmanager-style usage. """
+        self.start()
+        return self
+
+    def __exit__(self, exc_type, value, traceback):
+        """ For contextmanager-style usage. """
+        self.stop()
+
+class StorageService(object):
+
+    def __init__(self, node):
+        assert isinstance(node, Node)
+        self.node = node
+        self.jmx = JolokiaAgent(node)
+        self.mbean = make_mbean("db", type="StorageService")
+
+    def start(self):
+        self.jmx.start()
+
+    def stop(self):
+        self.jmx.stop()
+
+    def get_replicas(self, ks, cf, key):
+        return self.jmx.execute_method(self.mbean, 
"getNaturalEndpointsWithPort(java.lang.String,java.lang.String,java.lang.String,boolean)",
 [ks, cf, key, True])
+
+    def __enter__(self):
+        """ For contextmanager-style usage. """
+        self.start()
+        return self
+
+    def __exit__(self, exc_type, value, traceback):
+        """ For contextmanager-style usage. """
+        self.stop()
+
+def patch_start(startable):
+    old_start = startable.start
+
+    def new_start(self, *args, **kwargs):
+        kwargs['jvm_args'] = kwargs.get('jvm_args', []) + 
['-XX:-PerfDisableSharedMem',
+                                                           
'-Dcassandra.enable_nodelocal_queries=true']
+        return old_start(*args, **kwargs)
+
+    startable.start = types.MethodType(new_start, startable)
+    return startable
+
+def get_sstable_data(cls, node, keyspace):
+    _sstable_name = re.compile('SSTable: (.+)')
+    _repaired_at = re.compile('Repaired at: (\d+)')
+    _pending_repair = re.compile('Pending repair: (\-\-|null|[a-f0-9\-]+)')
+
+    out = node.run_sstablemetadata(keyspace=keyspace).stdout
+
+    def matches(pattern):
+        return filter(None, [pattern.match(l) for l in 
out.decode("utf-8").split('\n')])
+    names = [m.group(1) for m in matches(_sstable_name)]
+    repaired_times = [int(m.group(1)) for m in matches(_repaired_at)]
+
+    def uuid_or_none(s):
+        return None if s == 'null' or s == '--' else UUID(s)
+    pending_repairs = [uuid_or_none(m.group(1)) for m in 
matches(_pending_repair)]
+    assert names
+    assert repaired_times
+    assert pending_repairs
+    assert len(names) == len(repaired_times) == len(pending_repairs)
+    return [SSTable(*a) for a in zip(names, repaired_times, pending_repairs)]
+
+
+class TransientReplicationBase(Tester):
+
+    keyspace = "ks"
+    table = "tbl"
+
+    @pytest.fixture
+    def cheap_quorums(self):
+        return False
+
+    def populate(self):
+        self.cluster.populate(3, tokens=self.tokens, debug=True, 
install_byteman=True)
+
+    def set_nodes(self):
+        self.node1, self.node2, self.node3 = self.nodes
+
+        # Make sure digest is not attempted against the transient node
+        self.node3.byteman_submit(['./byteman/throw_on_digest.btm'])
+
+
+    def replication_factor(self):
+        return '3/1'
+
+    def tokens(self):
+        return [0, 1, 2]
+
+    def setup_schema(self):
+        session = self.exclusive_cql_connection(self.node1)
+        replication_params = OrderedDict()
+        replication_params['class'] = 'NetworkTopologyStrategy'
+        replication_params['datacenter1'] = self.replication_factor()
+        replication_params = ', '.join("'%s': '%s'" % (k, v) for k, v in 
replication_params.items())
+        session.execute("CREATE KEYSPACE %s WITH REPLICATION={%s}" % 
(self.keyspace, replication_params))
+        session.execute("CREATE TABLE %s.%s (pk int, ck int, value int, 
PRIMARY KEY (pk, ck)) WITH speculative_retry = 'NEVER' AND read_repair = 
'NONE'" % (self.keyspace, self.table))
+
+    @pytest.fixture(scope='function', autouse=True)
+    def setup_cluster(self, fixture_dtest_setup):
+        self.tokens = self.tokens()
+
+        patch_start(self.cluster)
+        
self.cluster.set_configuration_options(values={'hinted_handoff_enabled': False,
+                                                       'num_tokens': 1,
+                                                       
'commitlog_sync_period_in_ms': 500,
+                                                       
'enable_transient_replication': True,
+                                                       'dynamic_snitch': 
False})
+        self.populate()
+        self.cluster.start(wait_other_notice=True, wait_for_binary_proto=True)
+
+        self.nodes = [patch_start(node) for node in self.cluster.nodelist()]
+        self.set_nodes()
+
+        session = self.exclusive_cql_connection(self.node3)
+        self.setup_schema()
+
+    def assert_has_sstables(self, node, flush=False, compact=False):
+        if flush:
+            node.flush()
+        if compact:
+            node.nodetool(' '.join(['compact', self.keyspace, self.table]))
+
+        sstables = node.get_sstables(self.keyspace, self.table)
+        assert sstables
+
+    def assert_has_no_sstables(self, node, flush=False, compact=False):
+        if flush:
+            node.flush()
+        if compact:
+            node.nodetool(' '.join(['compact', self.keyspace, self.table]))
+
+        sstables = node.get_sstables(self.keyspace, self.table)
+        assert not sstables
+
+    def quorum(self, session, stmt_str):
+        return session.execute(SimpleStatement(stmt_str, 
consistency_level=ConsistencyLevel.QUORUM))
+
+    def nodelocal(self, session, stmt_str):
+        return session.execute(SimpleStatement(stmt_str, 
consistency_level=NODELOCAL))
+
+    def assert_local_rows(self, node, rows, ignore_order=False):
+        assert_all(self.exclusive_cql_connection(node),
+                   "SELECT * FROM %s.%s" % (self.keyspace, self.table),
+                   rows,
+                   cl=NODELOCAL,
+                   ignore_order=ignore_order)
+
+    def insert_row(self, pk, ck, value, session=None, node=None):
+        session = session or self.exclusive_cql_connection(node or self.node1)
+        token = Murmur3Token.from_key(pack('>i', pk)).value
+        assert token < self.tokens[0] or self.tokens[-1] < token   # primary 
replica should be node1
+        self.quorum(session, "INSERT INTO %s.%s (pk, ck, value) VALUES (%s, 
%s, %s)" % (self.keyspace, self.table, pk, ck, value))
+
+    def delete_row(self, pk, ck, session=None, node=None):
+        session = session or self.exclusive_cql_connection(node or self.node1)
+        token = Murmur3Token.from_key(pack('>i', pk)).value
+        assert token < self.tokens[0] or self.tokens[-1] < token   # primary 
replica should be node1
+        self.quorum(session, "DELETE FROM %s.%s WHERE pk = %s AND ck = %s" % 
(self.keyspace, self.table, pk, ck))
+
+    def read_as_list(self, query, session=None, node=None):
+        session = session or self.exclusive_cql_connection(node or self.node1)
+        return rows_to_list(self.quorum(session, query))
+
+    def table_metrics(self, node):
+        return TableMetrics(node, self.keyspace, self.table)
+
+    def split(self, arr):
+        arr1 = []
+        arr2 = []
+        for idx, item in enumerate(arr):
+            if idx % 2 == 0:
+                arr1.append(item)
+            else:
+                arr2.append(item)
+        return (arr1, arr2)
+
+    def generate_rows(self, partitions, rows):
+        return [[pk, ck, pk+ck] for ck in range(rows) for pk in 
range(partitions)]
+
+
+class TestTransientReplication(TransientReplicationBase):
+
+    @pytest.mark.no_vnodes
+    def test_transient_noop_write(self):
+        """ If both full replicas are available, nothing should be written to 
the transient replica """
+        for node in self.nodes:
+            self.assert_has_no_sstables(node)
+
+        tm = lambda n: self.table_metrics(n)
+        with tm(self.node1) as tm1, tm(self.node2) as tm2, tm(self.node3) as 
tm3:
+            assert tm1.write_count == 0
+            assert tm2.write_count == 0
+            assert tm3.write_count == 0
+            self.insert_row(1, 1, 1)
+            assert tm1.write_count == 1
+            assert tm2.write_count == 1
+            assert tm3.write_count == 0
+
+        self.assert_has_sstables(self.node1, flush=True)
+        self.assert_has_sstables(self.node2, flush=True)
+        self.assert_has_no_sstables(self.node3, flush=True)
+
+    @pytest.mark.no_vnodes
+    def test_transient_write(self):
+        """ If write can't succeed on full replica, it's written to the 
transient node instead """
+        for node in self.nodes:
+            self.assert_has_no_sstables(node)
+
+        tm = lambda n: self.table_metrics(n)
+        with tm(self.node1) as tm1, tm(self.node2) as tm2, tm(self.node3) as 
tm3:
+            self.insert_row(1, 1, 1)
+            # Stop writes to the other full node
+            self.node2.byteman_submit(['./byteman/stop_writes.btm'])
+            self.insert_row(1, 2, 2)
+
+        # node1 should contain both rows
+        self.assert_local_rows(self.node1,
+                               [[1, 1, 1],
+                                [1, 2, 2]])
+
+        # write couldn't succeed on node2, so it has only the first row
+        self.assert_local_rows(self.node2,
+                               [[1, 1, 1]])
+
+        # transient replica should hold only the second row
+        self.assert_local_rows(self.node3,
+                               [[1, 2, 2]])
+
+    @pytest.mark.no_vnodes
+    def test_transient_full_merge_read(self):
+        """ When reading, transient replica should serve a missing read """
+        for node in self.nodes:
+            self.assert_has_no_sstables(node)
+
+        tm = lambda n: self.table_metrics(n)
+        self.insert_row(1, 1, 1)
+        # Stop writes to the other full node
+        self.node2.byteman_submit(['./byteman/stop_writes.btm'])
+        self.insert_row(1, 2, 2)
+
+        # Stop reads from the node that will hold the second row
+        self.node1.stop()
+
+        # Whether we're reading from the full node or from the transient node, 
we should get consistent results
+        for node in [self.node2, self.node3]:
+            assert_all(self.exclusive_cql_connection(node),
+                       "SELECT * FROM %s.%s" % (self.keyspace, self.table),
+                       [[1, 1, 1],
+                        [1, 2, 2]],
+                       cl=ConsistencyLevel.QUORUM)
+
+    @pytest.mark.no_vnodes
+    def test_srp(self):
+        """ When reading, transient replica should serve a missing read """
+        for node in self.nodes:
+            self.assert_has_no_sstables(node)
+
+        tm = lambda n: self.table_metrics(n)
+        self.insert_row(1, 1, 1)
+        self.insert_row(1, 2, 2)
+
+        # Stop writes to the other full node
+        self.node2.byteman_submit(['./byteman/stop_writes.btm'])
+        self.delete_row(1, 1, node = self.node1)
+
+        # Stop reads from the node that will hold the second row
+        self.node1.stop()
+
+        # Whether we're reading from the full node or from the transient node, 
we should get consistent results
+        assert_all(self.exclusive_cql_connection(self.node3),
+                   "SELECT * FROM %s.%s LIMIT 1" % (self.keyspace, self.table),
+                   [[1, 2, 2]],
+                   cl=ConsistencyLevel.QUORUM)
+
+    @pytest.mark.no_vnodes
+    def test_transient_full_merge_read_with_delete_transient_coordinator(self):
+        self._test_transient_full_merge_read_with_delete(self.node3)
+
+    @pytest.mark.no_vnodes
+    def test_transient_full_merge_read_with_delete_full_coordinator(self):
+        self._test_transient_full_merge_read_with_delete(self.node2)
+
+    @pytest.mark.no_vnodes
+    def _test_transient_full_merge_read_with_delete(self, coordinator):
+        """ When reading, transient replica should serve a missing read """
+        for node in self.nodes:
+            self.assert_has_no_sstables(node)
+
+        tm = lambda n: self.table_metrics(n)
+        self.insert_row(1, 1, 1)
+        self.insert_row(1, 2, 2)
+        # Stop writes to the other full node
+        self.node2.byteman_submit(['./byteman/stop_writes.btm'])
+        self.delete_row(1, 2)
+
+        self.assert_local_rows(self.node3,
+                               [])
+        # Stop reads from the node that will hold the second row
+        self.node1.stop()
+
+        assert_all(self.exclusive_cql_connection(coordinator),
+                   "SELECT * FROM %s.%s" % (self.keyspace, self.table),
+                   [[1, 1, 1]],
+                   cl=ConsistencyLevel.QUORUM)
+
+    def _test_speculative_write_repair_cycle(self, primary_range, 
optimized_repair, repair_coordinator, expect_node3_data):
+        """
+        if one of the full replicas is not available, data should be written 
to the transient replica, but removed after incremental repair
+        """
+        for node in self.nodes:
+            self.assert_has_no_sstables(node)
+
+        self.node2.byteman_submit(['./byteman/stop_writes.btm'])
+        # self.insert_row(1)
+        tm = lambda n: self.table_metrics(n)
+        with tm(self.node1) as tm1, tm(self.node2) as tm2, tm(self.node3) as 
tm3:
+            assert tm1.write_count == 0
+            assert tm2.write_count == 0
+            assert tm3.write_count == 0
+            self.insert_row(1, 1, 1)
+            assert tm1.write_count == 1
+            assert tm2.write_count == 0
+            assert tm3.write_count == 1
+
+        self.assert_has_sstables(self.node1, flush=True)
+        self.assert_has_no_sstables(self.node2, flush=True)
+        self.assert_has_sstables(self.node3, flush=True)
+
+        repair_opts = ['repair', self.keyspace]
+        if primary_range: repair_opts.append('-pr')
+        if optimized_repair: repair_opts.append('-os')
+        self.node1.nodetool(' '.join(repair_opts))
+
+        self.assert_has_sstables(self.node1, compact=True)
+        self.assert_has_sstables(self.node2, compact=True)
+        if expect_node3_data:
+            self.assert_has_sstables(self.node3, compact=True)
+        else:
+            self.assert_has_no_sstables(self.node3, compact=True)
+
+    @pytest.mark.no_vnodes
+    def test_speculative_write_repair_cycle(self):
+        """ incremental repair from full replica should remove data on node3 
"""
+        self._test_speculative_write_repair_cycle(primary_range=False,
+                                                  optimized_repair=False,
+                                                  
repair_coordinator=self.node1,
+                                                  expect_node3_data=False)
+
+    @pytest.mark.no_vnodes
+    def test_primary_range_repair(self):
+        """ optimized primary range incremental repair from full replica 
should remove data on node3 """
+        self._test_speculative_write_repair_cycle(primary_range=True,
+                                                  optimized_repair=False,
+                                                  
repair_coordinator=self.node1,
+                                                  expect_node3_data=False)
+
+    @pytest.mark.no_vnodes
+    def test_optimized_primary_range_repair(self):
+        """ optimized primary range incremental repair from full replica 
should remove data on node3 """
+        self._test_speculative_write_repair_cycle(primary_range=True,
+                                                  optimized_repair=True,
+                                                  
repair_coordinator=self.node1,
+                                                  expect_node3_data=False)
+
+    @pytest.mark.no_vnodes
+    def test_transient_incremental_repair(self):
+        """ transiently replicated ranges should be skipped when coordinating 
repairs """
+        self._test_speculative_write_repair_cycle(primary_range=True,
+                                                  optimized_repair=False,
+                                                  
repair_coordinator=self.node1,
+                                                  expect_node3_data=False)
+
+    @pytest.mark.no_vnodes
+    def test_cheap_quorums(self):
+        """ writes shouldn't make it to transient nodes """
+        session = self.exclusive_cql_connection(self.node1)
+        for node in self.nodes:
+            self.assert_has_no_sstables(node)
+
+        tm = lambda n: self.table_metrics(n)
+
+        with tm(self.node1) as tm1, tm(self.node2) as tm2, tm(self.node3) as 
tm3:
+            assert tm1.write_count == 0
+            assert tm2.write_count == 0
+            assert tm3.write_count == 0
+            self.insert_row(1, 1, 1, session=session)
+            assert tm1.write_count == 1
+            assert tm2.write_count == 1
+            assert tm3.write_count == 0
+
+    @pytest.mark.no_vnodes
+    def test_speculative_write(self):
+        """ if a full replica isn't responding, we should send the write to 
the transient replica """
+        session = self.exclusive_cql_connection(self.node1)
+        self.node2.byteman_submit(['./byteman/slow_writes.btm'])
+
+        self.insert_row(1, 1, 1, session=session)
+        self.assert_local_rows(self.node1, [[1,1,1]])
+        self.assert_local_rows(self.node2, [])
+        self.assert_local_rows(self.node3, [[1,1,1]])
+
+    @pytest.mark.no_vnodes
+    def test_full_repair_from_full_replica(self):
+        """ full repairs shouldn't replicate data to transient replicas """
+        session = self.exclusive_cql_connection(self.node1)
+        for node in self.nodes:
+            self.assert_has_no_sstables(node)
+
+        self.insert_row(1, 1, 1, session=session)
+
+        self.assert_has_sstables(self.node1, flush=True)
+        self.assert_has_sstables(self.node2, flush=True)
+        self.assert_has_no_sstables(self.node3, flush=True)
+
+        self.node1.nodetool(' '.join(['repair', self.keyspace, '-full']))
+
+        self.assert_has_sstables(self.node1, flush=True)
+        self.assert_has_sstables(self.node2, flush=True)
+        self.assert_has_no_sstables(self.node3, flush=True)
+
+    @pytest.mark.no_vnodes
+    def test_full_repair_from_transient_replica(self):
+        """ full repairs shouldn't replicate data to transient replicas """
+        session = self.exclusive_cql_connection(self.node1)
+        for node in self.nodes:
+            self.assert_has_no_sstables(node)
+
+        self.insert_row(1, 1, 1, session=session)
+
+        self.assert_has_sstables(self.node1, flush=True)
+        self.assert_has_sstables(self.node2, flush=True)
+        self.assert_has_no_sstables(self.node3, flush=True)
+
+        self.node3.nodetool(' '.join(['repair', self.keyspace, '-full']))
+
+        self.assert_has_sstables(self.node1, flush=True)
+        self.assert_has_sstables(self.node2, flush=True)
+        self.assert_has_no_sstables(self.node3, flush=True)
+
+    @pytest.mark.skip(reason="Doesn't test quite the right combination of 
forbidden RF changes right now")
+    def test_keyspace_rf_changes(self):
+        """ they should throw an exception """
+        session = self.exclusive_cql_connection(self.node1)
+        replication_params = OrderedDict()
+        replication_params['class'] = 'NetworkTopologyStrategy'
+        assert self.replication_factor() == '3/1'
+        replication_params['datacenter1'] = '5/2'
+        replication_params = ', '.join("'%s': '%s'" % (k, v) for k, v in 
replication_params.items())
+        with pytest.raises(ConfigurationException):
+            session.execute("ALTER KEYSPACE %s WITH REPLICATION={%s}" % 
(self.keyspace, replication_params))
+
+    def test_disabled_read_repair(self):
+        """ shouldn't allow creating tables without read repair disabled """
+        session = self.exclusive_cql_connection(self.node1)
+        with pytest.raises(InvalidRequest):
+            session.execute("CREATE TABLE %s.tbl2 (pk int, ck int, value int, 
PRIMARY KEY (pk, ck))" % self.keyspace)
+
+        with pytest.raises(InvalidRequest):
+            session.execute("ALTER TABLE %s.%s WITH read_repair = 'BLOCKING'" 
% (self.keyspace, self.table))
+
+
+class TestTransientReplicationSpeculativeQueries(TransientReplicationBase):
+    def setup_schema(self):
+        session = self.exclusive_cql_connection(self.node1)
+        replication_params = OrderedDict()
+        replication_params['class'] = 'NetworkTopologyStrategy'
+        replication_params['datacenter1'] = self.replication_factor()
+        replication_params = ', '.join("'%s': '%s'" % (k, v) for k, v in 
replication_params.items())
+        session.execute("CREATE KEYSPACE %s WITH REPLICATION={%s}" % 
(self.keyspace, replication_params))
+        session.execute("CREATE TABLE %s.%s (pk int, ck int, value int, 
PRIMARY KEY (pk, ck)) WITH speculative_retry = 'NEVER' AND read_repair = 
'NONE';" % (self.keyspace, self.table))
+
+    @pytest.mark.no_vnodes
+    def test_always_speculate(self):
+        """ If write can't succeed on full replica, it's written to the 
transient node instead """
+        session = self.exclusive_cql_connection(self.node1)
+        session.execute("ALTER TABLE %s.%s WITH speculative_retry = 'ALWAYS';" 
% (self.keyspace, self.table))
+        self.insert_row(1, 1, 1)
+        # Stop writes to the other full node
+        self.node2.byteman_submit(['./byteman/stop_writes.btm'])
+        self.insert_row(1, 2, 2)
+
+        for node in self.nodes:
+            assert_all(self.exclusive_cql_connection(node),
+                       "SELECT * FROM %s.%s WHERE pk = 1" % (self.keyspace, 
self.table),
+                       [[1, 1, 1],
+                        [1, 2, 2]],
+                       cl=ConsistencyLevel.QUORUM)
+
+    @pytest.mark.no_vnodes
+    def test_custom_speculate(self):
+        """ If write can't succeed on full replica, it's written to the 
transient node instead """
+        session = self.exclusive_cql_connection(self.node1)
+        session.execute("ALTER TABLE %s.%s WITH speculative_retry = 
'99.99PERCENTILE';" % (self.keyspace, self.table))
+        self.insert_row(1, 1, 1)
+        # Stop writes to the other full node
+        self.node2.byteman_submit(['./byteman/stop_writes.btm'])
+        self.insert_row(1, 2, 2)
+
+        for node in self.nodes:
+            assert_all(self.exclusive_cql_connection(node),
+                       "SELECT * FROM %s.%s WHERE pk = 1" % (self.keyspace, 
self.table),
+                       [[1, 1, 1],
+                        [1, 2, 2]],
+                       cl=ConsistencyLevel.QUORUM)
+
+class TestMultipleTransientNodes(TransientReplicationBase):
+    def populate(self):
+        self.cluster.populate(5, tokens=self.tokens, debug=True, 
install_byteman=True)
+
+    def set_nodes(self):
+        self.node1, self.node2, self.node3, self.node4, self.node5 = self.nodes
+
+    def replication_factor(self):
+        return '5/2'
+
+    def tokens(self):
+        return [0, 1, 2, 3, 4]
+
+    @pytest.mark.no_vnodes
+    def test_transient_full_merge_read(self):
+        """ When reading, transient replica should serve a missing read """
+        for node in self.nodes:
+            self.assert_has_no_sstables(node)
+
+        tm = lambda n: self.table_metrics(n)
+        self.insert_row(1, 1, 1)
+        # Stop writes to the other full node
+        self.node2.byteman_submit(['./byteman/stop_writes.btm'])
+        self.insert_row(1, 2, 2)
+
+        self.assert_local_rows(self.node1,
+                               [[1, 1, 1],
+                                [1, 2, 2]])
+        self.assert_local_rows(self.node2,
+                               [[1, 1, 1]])
+        self.assert_local_rows(self.node3,
+                               [[1, 1, 1],
+                                [1, 2, 2]])
+        self.assert_local_rows(self.node4,
+                               [[1, 2, 2]])
+        self.assert_local_rows(self.node5,
+                               [[1, 2, 2]])
+        # Stop reads from the node that will hold the second row
+        self.node1.stop()
+
+        # Whether we're reading from the full node or from the transient node, 
we should get consistent results
+        for node in [self.node2, self.node3, self.node4, self.node5]:
+            assert_all(self.exclusive_cql_connection(node),
+                       "SELECT * FROM %s.%s" % (self.keyspace, self.table),
+                       [[1, 1, 1],
+                        [1, 2, 2]],
+                       cl=ConsistencyLevel.QUORUM)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to