Add incremental repair support for --hosts, --force, and subrange repair

Patch by Blake Eggleston; reviewed by Marcus Eriksson for CASSANDRA-13818


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2f8bc9da
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2f8bc9da
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2f8bc9da

Branch: refs/heads/master
Commit: 2f8bc9da4e06ee26fd4ab2c35f416220c887a951
Parents: b76a066
Author: Blake Eggleston <bdeggles...@gmail.com>
Authored: Tue Aug 29 10:40:46 2017 -0700
Committer: Blake Eggleston <bdeggles...@gmail.com>
Committed: Sat Sep 30 08:43:58 2017 -0700

----------------------------------------------------------------------
 repair_tests/incremental_repair_test.py | 103 +++++++++++++++++++++++++++
 1 file changed, 103 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f8bc9da/repair_tests/incremental_repair_test.py
----------------------------------------------------------------------
diff --git a/repair_tests/incremental_repair_test.py 
b/repair_tests/incremental_repair_test.py
index b081d44..b99cf9e 100644
--- a/repair_tests/incremental_repair_test.py
+++ b/repair_tests/incremental_repair_test.py
@@ -7,6 +7,7 @@ from uuid import UUID, uuid1
 
 from cassandra import ConsistencyLevel
 from cassandra.query import SimpleStatement
+from cassandra.metadata import Murmur3Token
 from ccmlib.common import is_win
 from ccmlib.node import Node, ToolError
 from nose.plugins.attrib import attr
@@ -74,6 +75,13 @@ class TestIncRepair(Tester):
         self.assertTrue(all([t.repaired > 0 for t in data]), '{}'.format(data))
         self.assertTrue(all([t.pending_id is None for t in data]), 
'{}'.format(data))
 
+    def assertRepairedAndUnrepaired(self, node, keyspace):
+        """ Checks that a node has both repaired and unrepaired sstables for a 
given keyspace """
+        data = self._get_repaired_data(node, keyspace)
+        self.assertTrue(any([t.repaired > 0 for t in data]), '{}'.format(data))
+        self.assertTrue(any([t.repaired == 0 for t in data]), 
'{}'.format(data))
+        self.assertTrue(all([t.pending_id is None for t in data]), 
'{}'.format(data))
+
     @since('4.0')
     def consistent_repair_test(self):
         cluster = self.cluster
@@ -758,3 +766,98 @@ class TestIncRepair(Tester):
         for node in self.cluster.nodelist():
             result = node.repair(options=['ks', '--validate'])
             self.assertIn("Repaired data is in sync", result.stdout)
+
+    @since('4.0')
+    def force_test(self):
+        """ 
+        forcing an incremental repair should incrementally repair any nodes 
+        that are up, but should not promote the sstables to repaired 
+        """
+        cluster = self.cluster
+        cluster.set_configuration_options(values={'hinted_handoff_enabled': 
False, 'num_tokens': 1, 'commitlog_sync_period_in_ms': 500})
+        cluster.populate(3).start()
+        node1, node2, node3 = cluster.nodelist()
+
+        session = self.patient_exclusive_cql_connection(node3)
+        session.execute("CREATE KEYSPACE ks WITH 
REPLICATION={'class':'SimpleStrategy', 'replication_factor': 3}")
+        session.execute("CREATE TABLE ks.tbl (k INT PRIMARY KEY, v INT)")
+        stmt = SimpleStatement("INSERT INTO ks.tbl (k,v) VALUES (%s, %s)")
+        stmt.consistency_level = ConsistencyLevel.ALL
+        for i in range(10):
+            session.execute(stmt, (i, i))
+
+        node2.stop()
+
+        # repair should fail because node2 is down
+        with self.assertRaises(ToolError):
+            node1.repair(options=['ks'])
+
+        # run with force flag
+        node1.repair(options=['ks', '--force'])
+
+        # ... and verify nothing was promoted to repaired
+        self.assertNoRepairedSSTables(node1, 'ks')
+        self.assertNoRepairedSSTables(node2, 'ks')
+
+    @since('4.0')
+    def hosts_test(self):
+        """ 
+        running an incremental repair with hosts specified should 
incrementally repair 
+        the given nodes, but should not promote the sstables to repaired 
+        """
+        cluster = self.cluster
+        cluster.set_configuration_options(values={'hinted_handoff_enabled': 
False, 'num_tokens': 1, 'commitlog_sync_period_in_ms': 500})
+        cluster.populate(3).start()
+        node1, node2, node3 = cluster.nodelist()
+
+        session = self.patient_exclusive_cql_connection(node3)
+        session.execute("CREATE KEYSPACE ks WITH 
REPLICATION={'class':'SimpleStrategy', 'replication_factor': 3}")
+        session.execute("CREATE TABLE ks.tbl (k INT PRIMARY KEY, v INT)")
+        stmt = SimpleStatement("INSERT INTO ks.tbl (k,v) VALUES (%s, %s)")
+        stmt.consistency_level = ConsistencyLevel.ALL
+        for i in range(10):
+            session.execute(stmt, (i, i))
+
+        # run with force flag
+        node1.repair(options=['ks', '-hosts', ','.join([node1.address(), 
node2.address()])])
+
+        # ... and verify nothing was promoted to repaired
+        self.assertNoRepairedSSTables(node1, 'ks')
+        self.assertNoRepairedSSTables(node2, 'ks')
+
+    @since('4.0')
+    def subrange_test(self):
+        """ 
+        running an incremental repair with hosts specified should 
incrementally repair 
+        the given nodes, but should not promote the sstables to repaired 
+        """
+        cluster = self.cluster
+        cluster.set_configuration_options(values={'hinted_handoff_enabled': 
False,
+                                                  'num_tokens': 1,
+                                                  
'commitlog_sync_period_in_ms': 500,
+                                                  'partitioner': 
'org.apache.cassandra.dht.Murmur3Partitioner'})
+        cluster.populate(3).start()
+        node1, node2, node3 = cluster.nodelist()
+
+        session = self.patient_exclusive_cql_connection(node3)
+        session.execute("CREATE KEYSPACE ks WITH 
REPLICATION={'class':'SimpleStrategy', 'replication_factor': 3}")
+        session.execute("CREATE TABLE ks.tbl (k INT PRIMARY KEY, v INT)")
+        stmt = SimpleStatement("INSERT INTO ks.tbl (k,v) VALUES (%s, %s)")
+        stmt.consistency_level = ConsistencyLevel.ALL
+        for i in range(10):
+            session.execute(stmt, (i, i))
+
+        for node in cluster.nodelist():
+            node.flush()
+            self.assertNoRepairedSSTables(node, 'ks')
+
+        # only repair the partition k=0
+        token = Murmur3Token.from_key(str(bytearray([0,0,0,0])))
+        # import ipdb; ipdb.set_trace()
+        # run with force flag
+        node1.repair(options=['ks', '-st', str(token.value - 1), '-et', 
str(token.value)])
+
+        # verify we have a mix of repaired and unrepaired sstables
+        self.assertRepairedAndUnrepaired(node1, 'ks')
+        self.assertRepairedAndUnrepaired(node2, 'ks')
+        self.assertRepairedAndUnrepaired(node3, 'ks')


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to