Repository: cassandra-dtest Updated Branches: refs/heads/master 2548ec6e6 -> d291b2b90
Stream entire SSTables when possible patch by Dinesh Joshi; reviewed by Aleksey Yeschenko and Ariel Weisberg for CASSANDRA-14566 Project: http://git-wip-us.apache.org/repos/asf/cassandra-dtest/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra-dtest/commit/d291b2b9 Tree: http://git-wip-us.apache.org/repos/asf/cassandra-dtest/tree/d291b2b9 Diff: http://git-wip-us.apache.org/repos/asf/cassandra-dtest/diff/d291b2b9 Branch: refs/heads/master Commit: d291b2b90326c62c2df8f49098c6deb915c16460 Parents: 2548ec6 Author: Dinesh A. Joshi <[email protected]> Authored: Sun Jul 1 23:41:11 2018 -0700 Committer: Aleksey Yeschenko <[email protected]> Committed: Fri Jul 27 18:57:08 2018 +0100 ---------------------------------------------------------------------- disk_balance_test.py | 9 +++++ dtest.py | 11 ++++- streaming_test.py | 100 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 119 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/d291b2b9/disk_balance_test.py ---------------------------------------------------------------------- diff --git a/disk_balance_test.py b/disk_balance_test.py index a32ced9..2b28fd5 100644 --- a/disk_balance_test.py +++ b/disk_balance_test.py @@ -24,6 +24,15 @@ class TestDiskBalance(Tester): @jira_ticket CASSANDRA-6696 """ + @pytest.fixture(scope='function', autouse=True) + def fixture_set_cluster_settings(self, fixture_dtest_setup): + cluster = fixture_dtest_setup.cluster + cluster.schema_event_refresh_window = 0 + + # CASSANDRA-14556 should be disabled if you need directories to be perfectly balanced. + if cluster.version() >= '4.0': + cluster.set_configuration_options({'stream_entire_sstables': 'false'}) + def test_disk_balance_stress(self): cluster = self.cluster if self.dtest_config.use_vnodes: http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/d291b2b9/dtest.py ---------------------------------------------------------------------- diff --git a/dtest.py b/dtest.py index f76789f..4dcbd39 100644 --- a/dtest.py +++ b/dtest.py @@ -283,7 +283,13 @@ def get_eager_protocol_version(cassandra_version): # We default to UTF8Type because it's simpler to use in tests def create_cf(session, name, key_type="varchar", speculative_retry=None, read_repair=None, compression=None, - gc_grace=None, columns=None, validation="UTF8Type", compact_storage=False): + gc_grace=None, columns=None, validation="UTF8Type", compact_storage=False, compaction_strategy='SizeTieredCompactionStrategy'): + + compaction_fragment = "compaction = {'class': '%s', 'enabled': 'true'}" + if compaction_strategy == '': + compaction_fragment = compaction_fragment % 'SizeTieredCompactionStrategy' + else: + compaction_fragment = compaction_fragment % compaction_strategy additional_columns = "" if columns is not None: @@ -295,6 +301,9 @@ def create_cf(session, name, key_type="varchar", speculative_retry=None, read_re else: query = 'CREATE COLUMNFAMILY %s (key %s PRIMARY KEY%s) WITH comment=\'test cf\'' % (name, key_type, additional_columns) + if compaction_fragment is not None: + query = '%s AND %s' % (query, compaction_fragment) + if compression is not None: query = '%s AND compression = { \'sstable_compression\': \'%sCompressor\' }' % (query, compression) else: http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/d291b2b9/streaming_test.py ---------------------------------------------------------------------- diff --git a/streaming_test.py b/streaming_test.py new file mode 100644 index 0000000..bdb8b14 --- /dev/null +++ b/streaming_test.py @@ -0,0 +1,100 @@ +import logging +import operator + +import pytest +from cassandra import ConsistencyLevel +from pytest import mark + +from dtest import Tester, create_ks, create_cf +from tools.data import insert_c1c2 + +since = pytest.mark.since +logger = logging.getLogger(__name__) + +opmap = { + operator.eq: "==", + operator.gt: ">", + operator.lt: "<", + operator.ne: "!=", + operator.ge: ">=", + operator.le: "<=" +} + + +class TestStreaming(Tester): + + @pytest.fixture(autouse=True) + def fixture_add_additional_log_patterns(self, fixture_dtest_setup): + fixture_dtest_setup.ignore_log_patterns = ( + # This one occurs when trying to send the migration to a + # node that hasn't started yet, and when it does, it gets + # replayed and everything is fine. + r'Can\'t send migration request: node.*is down', + # ignore streaming error during bootstrap + r'Exception encountered during startup', + r'Streaming error occurred' + ) + + def _test_streaming(self, op_zerocopy, op_partial, num_partial, num_zerocopy, + compaction_strategy='LeveledCompactionStrategy', num_keys=1000, rf=3, num_nodes=3): + keys = num_keys + + cluster = self.cluster + tokens = cluster.balanced_tokens(num_nodes) + cluster.set_configuration_options(values={'endpoint_snitch': 'org.apache.cassandra.locator.PropertyFileSnitch'}) + cluster.set_configuration_options(values={'num_tokens': 1}) + + cluster.populate(num_nodes) + nodes = cluster.nodelist() + + for i in range(0, len(nodes)): + nodes[i].set_configuration_options(values={'initial_token': tokens[i]}) + + cluster.start(wait_for_binary_proto=True) + + session = self.patient_cql_connection(nodes[0]) + + create_ks(session, name='ks2', rf=rf) + + create_cf(session, 'cf', columns={'c1': 'text', 'c2': 'text'}, + compaction_strategy=compaction_strategy) + insert_c1c2(session, n=keys, consistency=ConsistencyLevel.ALL) + + session_n2 = self.patient_exclusive_cql_connection(nodes[1]) + session_n2.execute("TRUNCATE system.available_ranges;") + + mark = nodes[1].mark_log() + nodes[1].nodetool('rebuild -ks ks2') + + nodes[1].watch_log_for('Completed submission of build tasks', filename='debug.log', timeout=120) + zerocopy_streamed_sstable = len( + nodes[1].grep_log('.*CassandraEntireSSTableStreamReader.*?Finished receiving Data.*', filename='debug.log', + from_mark=mark)) + partial_streamed_sstable = len( + nodes[1].grep_log('.*CassandraStreamReader.*?Finished receiving file.*', filename='debug.log', + from_mark=mark)) + + assert op_zerocopy(zerocopy_streamed_sstable, num_zerocopy), "%s %s %s" % (num_zerocopy, opmap.get(op_zerocopy), + zerocopy_streamed_sstable) + assert op_partial(partial_streamed_sstable, num_partial), "%s %s %s" % (num_partial, op_partial, + partial_streamed_sstable) + + @since('4.0') + def test_zerocopy_streaming(self): + self._test_streaming(op_zerocopy=operator.gt, op_partial=operator.eq, num_zerocopy=1, num_partial=0, + num_nodes=2, rf=2) + + @since('4.0') + def test_zerocopy_streaming_leveled_compaction(self): + self._test_streaming(op_zerocopy=operator.gt, op_partial=operator.gt, num_zerocopy=1, num_partial=1, rf=2) + + @mark.xfail(reason="Not implemented yet. Should be functional after CASSANDRA-10540, CASSANDRA-14586 are fixed.") + @since('4.0') + def test_zerocopy_streaming_size_tiered_compaction(self): + self._test_streaming(op_zerocopy=operator.gt, op_partial=operator.gt, num_zerocopy=1, num_partial=1, rf=2, + num_nodes=3, compaction_strategy='SizeTieredCompactionStrategy') + + @since('4.0') + def test_zerocopy_streaming_no_replication(self): + self._test_streaming(op_zerocopy=operator.eq, op_partial=operator.eq, num_zerocopy=0, num_partial=0, rf=1, + num_nodes=3) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
