Repository: cassandra-dtest
Updated Branches:
  refs/heads/master 2548ec6e6 -> d291b2b90


Stream entire SSTables when possible

patch by Dinesh Joshi; reviewed by Aleksey Yeschenko and Ariel Weisberg
for CASSANDRA-14566


Project: http://git-wip-us.apache.org/repos/asf/cassandra-dtest/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra-dtest/commit/d291b2b9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra-dtest/tree/d291b2b9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra-dtest/diff/d291b2b9

Branch: refs/heads/master
Commit: d291b2b90326c62c2df8f49098c6deb915c16460
Parents: 2548ec6
Author: Dinesh A. Joshi <[email protected]>
Authored: Sun Jul 1 23:41:11 2018 -0700
Committer: Aleksey Yeschenko <[email protected]>
Committed: Fri Jul 27 18:57:08 2018 +0100

----------------------------------------------------------------------
 disk_balance_test.py |   9 +++++
 dtest.py             |  11 ++++-
 streaming_test.py    | 100 ++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 119 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/d291b2b9/disk_balance_test.py
----------------------------------------------------------------------
diff --git a/disk_balance_test.py b/disk_balance_test.py
index a32ced9..2b28fd5 100644
--- a/disk_balance_test.py
+++ b/disk_balance_test.py
@@ -24,6 +24,15 @@ class TestDiskBalance(Tester):
     @jira_ticket CASSANDRA-6696
     """
 
+    @pytest.fixture(scope='function', autouse=True)
+    def fixture_set_cluster_settings(self, fixture_dtest_setup):
+        cluster = fixture_dtest_setup.cluster
+        cluster.schema_event_refresh_window = 0
+
+        # CASSANDRA-14556 should be disabled if you need directories to be 
perfectly balanced.
+        if cluster.version() >= '4.0':
+            cluster.set_configuration_options({'stream_entire_sstables': 
'false'})
+
     def test_disk_balance_stress(self):
         cluster = self.cluster
         if self.dtest_config.use_vnodes:

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/d291b2b9/dtest.py
----------------------------------------------------------------------
diff --git a/dtest.py b/dtest.py
index f76789f..4dcbd39 100644
--- a/dtest.py
+++ b/dtest.py
@@ -283,7 +283,13 @@ def get_eager_protocol_version(cassandra_version):
 
 # We default to UTF8Type because it's simpler to use in tests
 def create_cf(session, name, key_type="varchar", speculative_retry=None, 
read_repair=None, compression=None,
-              gc_grace=None, columns=None, validation="UTF8Type", 
compact_storage=False):
+              gc_grace=None, columns=None, validation="UTF8Type", 
compact_storage=False, compaction_strategy='SizeTieredCompactionStrategy'):
+
+    compaction_fragment = "compaction = {'class': '%s', 'enabled': 'true'}"
+    if compaction_strategy == '':
+        compaction_fragment = compaction_fragment % 
'SizeTieredCompactionStrategy'
+    else:
+        compaction_fragment = compaction_fragment % compaction_strategy
 
     additional_columns = ""
     if columns is not None:
@@ -295,6 +301,9 @@ def create_cf(session, name, key_type="varchar", 
speculative_retry=None, read_re
     else:
         query = 'CREATE COLUMNFAMILY %s (key %s PRIMARY KEY%s) WITH 
comment=\'test cf\'' % (name, key_type, additional_columns)
 
+    if compaction_fragment is not None:
+        query = '%s AND %s' % (query, compaction_fragment)
+
     if compression is not None:
         query = '%s AND compression = { \'sstable_compression\': 
\'%sCompressor\' }' % (query, compression)
     else:

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/d291b2b9/streaming_test.py
----------------------------------------------------------------------
diff --git a/streaming_test.py b/streaming_test.py
new file mode 100644
index 0000000..bdb8b14
--- /dev/null
+++ b/streaming_test.py
@@ -0,0 +1,100 @@
+import logging
+import operator
+
+import pytest
+from cassandra import ConsistencyLevel
+from pytest import mark
+
+from dtest import Tester, create_ks, create_cf
+from tools.data import insert_c1c2
+
+since = pytest.mark.since
+logger = logging.getLogger(__name__)
+
+opmap = {
+    operator.eq: "==",
+    operator.gt: ">",
+    operator.lt: "<",
+    operator.ne: "!=",
+    operator.ge: ">=",
+    operator.le: "<="
+}
+
+
+class TestStreaming(Tester):
+
+    @pytest.fixture(autouse=True)
+    def fixture_add_additional_log_patterns(self, fixture_dtest_setup):
+        fixture_dtest_setup.ignore_log_patterns = (
+            # This one occurs when trying to send the migration to a
+            # node that hasn't started yet, and when it does, it gets
+            # replayed and everything is fine.
+            r'Can\'t send migration request: node.*is down',
+            # ignore streaming error during bootstrap
+            r'Exception encountered during startup',
+            r'Streaming error occurred'
+        )
+
+    def _test_streaming(self, op_zerocopy, op_partial, num_partial, 
num_zerocopy,
+                        compaction_strategy='LeveledCompactionStrategy', 
num_keys=1000, rf=3, num_nodes=3):
+        keys = num_keys
+
+        cluster = self.cluster
+        tokens = cluster.balanced_tokens(num_nodes)
+        cluster.set_configuration_options(values={'endpoint_snitch': 
'org.apache.cassandra.locator.PropertyFileSnitch'})
+        cluster.set_configuration_options(values={'num_tokens': 1})
+
+        cluster.populate(num_nodes)
+        nodes = cluster.nodelist()
+
+        for i in range(0, len(nodes)):
+            nodes[i].set_configuration_options(values={'initial_token': 
tokens[i]})
+
+        cluster.start(wait_for_binary_proto=True)
+
+        session = self.patient_cql_connection(nodes[0])
+
+        create_ks(session, name='ks2', rf=rf)
+
+        create_cf(session, 'cf', columns={'c1': 'text', 'c2': 'text'},
+                  compaction_strategy=compaction_strategy)
+        insert_c1c2(session, n=keys, consistency=ConsistencyLevel.ALL)
+
+        session_n2 = self.patient_exclusive_cql_connection(nodes[1])
+        session_n2.execute("TRUNCATE system.available_ranges;")
+
+        mark = nodes[1].mark_log()
+        nodes[1].nodetool('rebuild -ks ks2')
+
+        nodes[1].watch_log_for('Completed submission of build tasks', 
filename='debug.log', timeout=120)
+        zerocopy_streamed_sstable = len(
+            nodes[1].grep_log('.*CassandraEntireSSTableStreamReader.*?Finished 
receiving Data.*', filename='debug.log',
+                              from_mark=mark))
+        partial_streamed_sstable = len(
+            nodes[1].grep_log('.*CassandraStreamReader.*?Finished receiving 
file.*', filename='debug.log',
+                              from_mark=mark))
+
+        assert op_zerocopy(zerocopy_streamed_sstable, num_zerocopy), "%s %s 
%s" % (num_zerocopy, opmap.get(op_zerocopy),
+                                                                               
    zerocopy_streamed_sstable)
+        assert op_partial(partial_streamed_sstable, num_partial), "%s %s %s" % 
(num_partial, op_partial,
+                                                                               
 partial_streamed_sstable)
+
+    @since('4.0')
+    def test_zerocopy_streaming(self):
+        self._test_streaming(op_zerocopy=operator.gt, op_partial=operator.eq, 
num_zerocopy=1, num_partial=0,
+                             num_nodes=2, rf=2)
+
+    @since('4.0')
+    def test_zerocopy_streaming_leveled_compaction(self):
+        self._test_streaming(op_zerocopy=operator.gt, op_partial=operator.gt, 
num_zerocopy=1, num_partial=1, rf=2)
+
+    @mark.xfail(reason="Not implemented yet. Should be functional after 
CASSANDRA-10540, CASSANDRA-14586 are fixed.")
+    @since('4.0')
+    def test_zerocopy_streaming_size_tiered_compaction(self):
+        self._test_streaming(op_zerocopy=operator.gt, op_partial=operator.gt, 
num_zerocopy=1, num_partial=1, rf=2,
+                             num_nodes=3, 
compaction_strategy='SizeTieredCompactionStrategy')
+
+    @since('4.0')
+    def test_zerocopy_streaming_no_replication(self):
+        self._test_streaming(op_zerocopy=operator.eq, op_partial=operator.eq, 
num_zerocopy=0, num_partial=0, rf=1,
+                             num_nodes=3)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to