This is an automated email from the ASF dual-hosted git repository. jlewandowski pushed a commit to branch ds-trunk-5.0--2024-07-24 in repository https://gitbox.apache.org/repos/asf/cassandra-dtest.git
commit 3aee7310c81f30a0b4d47ff00f669c343e4969e3 Author: dan jatnieks <[email protected]> AuthorDate: Tue Jun 8 07:17:08 2021 -0700 STAR-543: Port guardrail tests and changes (#19) Co-authored-by: Aleksandr Sorokoumov <[email protected]> (cherry picked from commit fd2b1c3d96e78c1530befd8fce5a75d5f37a9894) (cherry picked from commit 66e6b7210d92cc97dc27d30808e5f53e491c80d9) (cherry picked from commit 70601302cbd4c0fddad58d183a96055ff3b8a87b) (cherry picked from commit ac2f0f6be80f3839bc0435789f629c2e746b488c) (cherry picked from commit 5d2ce35e9b2a594877829abe117a34f65a98dfca) (cherry picked from commit e86263e8fab346406287885b9088afdc269ae62f) (cherry picked from commit 47c0399e46152c74fe68cce79d5c39970215dafd) --- byteman/guardrails/disk_usage_full.btm | 8 +++ byteman/guardrails/disk_usage_stuffed.btm | 8 +++ client_request_metrics_test.py | 3 +- compaction_test.py | 10 +++- cqlsh_tests/test_cqlsh.py | 7 ++- cqlsh_tests/test_cqlsh_copy.py | 26 ++++++-- dtest_setup.py | 4 ++ guardrails_test.py | 99 +++++++++++++++++++++++++++++++ paging_test.py | 26 +++++--- pushed_notifications_test.py | 33 ++++++----- read_failures_test.py | 17 ++++-- tools/misc.py | 11 ++++ 12 files changed, 215 insertions(+), 37 deletions(-) diff --git a/byteman/guardrails/disk_usage_full.btm b/byteman/guardrails/disk_usage_full.btm new file mode 100644 index 00000000..bbdf8ddc --- /dev/null +++ b/byteman/guardrails/disk_usage_full.btm @@ -0,0 +1,8 @@ +RULE return FULL disk usage +CLASS org.apache.cassandra.service.disk.usage.DiskUsageMonitor +METHOD getState +AT EXIT +IF TRUE +DO + return org.apache.cassandra.service.disk.usage.DiskUsageState.FULL; +ENDRULE \ No newline at end of file diff --git a/byteman/guardrails/disk_usage_stuffed.btm b/byteman/guardrails/disk_usage_stuffed.btm new file mode 100644 index 00000000..32562113 --- /dev/null +++ b/byteman/guardrails/disk_usage_stuffed.btm @@ -0,0 +1,8 @@ +RULE return STUFFED disk usage +CLASS org.apache.cassandra.service.disk.usage.DiskUsageMonitor +METHOD getState +AT EXIT +IF TRUE +DO + return org.apache.cassandra.service.disk.usage.DiskUsageState.STUFFED; +ENDRULE \ No newline at end of file diff --git a/client_request_metrics_test.py b/client_request_metrics_test.py index 7286ccbc..3fe26683 100644 --- a/client_request_metrics_test.py +++ b/client_request_metrics_test.py @@ -42,7 +42,7 @@ class TestClientRequestMetrics(Tester): fixture_dtest_setup.ignore_log_patterns = ( 'Testing write failures', # The error to simulate a write failure 'ERROR WRITE_FAILURE', # Logged in DEBUG mode for write failures - f"Scanned over {TOMBSTONE_FAILURE_THRESHOLD + 1} tombstones during query" # Caused by the read failure tests + f"Scanned over {TOMBSTONE_FAILURE_THRESHOLD + 1} (tombstones|tombstone rows) during query" # Caused by the read failure tests ) def setup_once(self): @@ -50,6 +50,7 @@ class TestClientRequestMetrics(Tester): cluster.set_configuration_options({'read_request_timeout_in_ms': 3000, 'write_request_timeout_in_ms': 3000, 'phi_convict_threshold': 12, + 'tombstone_warn_threshold': -1, 'tombstone_failure_threshold': TOMBSTONE_FAILURE_THRESHOLD, 'enable_materialized_views': 'true'}) cluster.populate(2, debug=True) diff --git a/compaction_test.py b/compaction_test.py index b37db3ec..5a0d69b0 100644 --- a/compaction_test.py +++ b/compaction_test.py @@ -355,7 +355,10 @@ class TestCompaction(Tester): Check that we log a warning when the partition size is bigger than compaction_large_partition_warning_threshold_mb """ cluster = self.cluster - cluster.set_configuration_options({'compaction_large_partition_warning_threshold_mb': 1}) + if self.supports_guardrails: + cluster.set_configuration_options({'guardrails': {'partition_size_warn_threshold_in_mb': 1}}) + else: + cluster.set_configuration_options({'compaction_large_partition_warning_threshold_mb': 1}) cluster.populate(1).start() [node] = cluster.nodelist() @@ -377,7 +380,10 @@ class TestCompaction(Tester): node.nodetool('compact ks large') verb = 'Writing' if self.cluster.version() > '2.2' else 'Compacting' sizematcher = '\d+ bytes' if self.cluster.version() < LooseVersion('3.6') else '\d+\.\d{3}(K|M|G)iB' - node.watch_log_for('{} large partition ks/large:user \({}'.format(verb, sizematcher), from_mark=mark, timeout=180) + log_message = '{} large partition ks/large:user \({}'.format(verb, sizematcher) + if self.supports_guardrails: + log_message = "Detected partition 'user' in ks.large of size 2MB is greater than the maximum recommended size \(1MB\)" + node.watch_log_for(log_message, from_mark=mark, timeout=180) ret = list(session.execute("SELECT properties from ks.large where userid = 'user'")) assert_length_equal(ret, 1) diff --git a/cqlsh_tests/test_cqlsh.py b/cqlsh_tests/test_cqlsh.py index 92e6b30c..5eea39ae 100644 --- a/cqlsh_tests/test_cqlsh.py +++ b/cqlsh_tests/test_cqlsh.py @@ -2086,8 +2086,11 @@ Tracing session:""") """ max_partitions_per_batch = 5 self.cluster.populate(3) - self.cluster.set_configuration_options({ - 'unlogged_batch_across_partitions_warn_threshold': str(max_partitions_per_batch)}) + + config_opts = {'unlogged_batch_across_partitions_warn_threshold': str(max_partitions_per_batch)} + if self.supports_guardrails: + config_opts = {"guardrails": config_opts} + self.cluster.set_configuration_options(config_opts) self.cluster.start() diff --git a/cqlsh_tests/test_cqlsh_copy.py b/cqlsh_tests/test_cqlsh_copy.py index 2f4725f2..458804c7 100644 --- a/cqlsh_tests/test_cqlsh_copy.py +++ b/cqlsh_tests/test_cqlsh_copy.py @@ -2481,8 +2481,12 @@ class TestCqlshCopy(Tester): @jira_ticket CASSANDRA-9302 """ + config_opts = {'batch_size_warn_threshold_in_kb': '10'} + if self.supports_guardrails: # batch size thresholds moved to guardrails in 4.0 + config_opts = {'guardrails': config_opts} + self._test_bulk_round_trip(nodes=3, partitioner="murmur3", num_operations=10000, - configuration_options={'batch_size_warn_threshold_in_kb': '10'}, + configuration_options=config_opts, profile=os.path.join(os.path.dirname(os.path.realpath(__file__)), 'blogposts.yaml'), stress_table='stresscql.blogposts') @@ -2495,9 +2499,16 @@ class TestCqlshCopy(Tester): @jira_ticket CASSANDRA-10938 """ + batch_size_warn_threshold_in_kb = '10' + native_transport_max_concurrent_connections = '12' + if self.supports_guardrails: # batch size thresholds moved to guardrails in 4.0 + config_opts = {'guardrails': {'batch_size_warn_threshold_in_kb': batch_size_warn_threshold_in_kb}, + 'native_transport_max_concurrent_connections': native_transport_max_concurrent_connections} + else: + config_opts = {'native_transport_max_concurrent_connections': native_transport_max_concurrent_connections, + 'batch_size_warn_threshold_in_kb': batch_size_warn_threshold_in_kb} self._test_bulk_round_trip(nodes=3, partitioner="murmur3", num_operations=10000, - configuration_options={'native_transport_max_concurrent_connections': '12', - 'batch_size_warn_threshold_in_kb': '10'}, + configuration_options=config_opts, profile=os.path.join(os.path.dirname(os.path.realpath(__file__)), 'blogposts.yaml'), stress_table='stresscql.blogposts', copy_to_options={'NUMPROCESSES': 5, 'MAXATTEMPTS': 20}, @@ -2827,8 +2838,13 @@ class TestCqlshCopy(Tester): @jira_ticket CASSANDRA-11474 """ num_records = 100 - self.prepare(nodes=1, configuration_options={'batch_size_warn_threshold_in_kb': '1', # warn with 1kb and fail - 'batch_size_fail_threshold_in_kb': '5'}) # with 5kb size batches + batch_size_warn_threshold_in_kb = '1' # warn with 1kb and fail + batch_size_fail_threshold_in_kb = '5' # with 5kb size batches + config_opts = {'batch_size_warn_threshold_in_kb': batch_size_warn_threshold_in_kb, + 'batch_size_fail_threshold_in_kb': batch_size_fail_threshold_in_kb} + if self.supports_guardrails: # batch size thresholds moved to guardrails in 4.0 + config_opts = {'guardrails': config_opts} + self.prepare(nodes=1, configuration_options=config_opts) logger.debug('Running stress') stress_table_name = 'standard1' diff --git a/dtest_setup.py b/dtest_setup.py index 3980259b..863e7648 100644 --- a/dtest_setup.py +++ b/dtest_setup.py @@ -376,6 +376,10 @@ class DTestSetup(object): def supports_v5_protocol(self, cluster_version): return cluster_version >= LooseVersion('4.0') + def supports_guardrails(self): + return self.cluster.version() >= LooseVersion('4.0') + + def cleanup_last_test_dir(self): if os.path.exists(self.last_test_dir): os.remove(self.last_test_dir) diff --git a/guardrails_test.py b/guardrails_test.py new file mode 100644 index 00000000..bf883bba --- /dev/null +++ b/guardrails_test.py @@ -0,0 +1,99 @@ +import logging +import time +import pytest +import re + +from cassandra import InvalidRequest + +from dtest import Tester, create_ks +from tools.assertions import assert_one + +since = pytest.mark.since +logger = logging.getLogger(__name__) + +class BaseGuardrailsTester(Tester): + + def prepare(self, rf=1, options=None, nodes=3, install_byteman=False, extra_jvm_args=None, **kwargs): + if options is None: + options = {} + + if extra_jvm_args is None: + extra_jvm_args = [] + + cluster = self.cluster + cluster.set_log_level('TRACE') + cluster.populate(nodes, install_byteman=install_byteman) + if options: + cluster.set_configuration_options(values=options) + + cluster.start(jvm_args=extra_jvm_args) + node1 = cluster.nodelist()[0] + + session = self.patient_cql_connection(node1, **kwargs) + create_ks(session, 'ks', rf) + + return session + + +@since('4.0') +class TestGuardrails(BaseGuardrailsTester): + + def test_disk_usage_guardrail(self): + """ + Test disk usage guardrail will warn if exceeds warn threshold and reject writes if exceeds failure threshold + """ + + self.fixture_dtest_setup.ignore_log_patterns = ["Write request failed because disk usage exceeds failure threshold"] + guardrails_config = {'guardrails': {'disk_usage_percentage_warn_threshold': 98, + 'disk_usage_percentage_failure_threshold': 99}} + + logger.debug("prepare 2-node cluster with rf=1 and guardrails enabled") + session = self.prepare(rf=1, nodes=2, options=guardrails_config, extra_jvm_args=['-Dcassandra.disk_usage.monitor_interval_ms=100'], install_byteman=True) + node1, node2 = self.cluster.nodelist() + session.execute("CREATE TABLE t (id int PRIMARY KEY, v int)") + + logger.debug("Inject FULL to node1, expect log on node1 and node2 rejects writes") + mark = node1.mark_log() + self.disk_usage_injection(node1, "full", False) + node1.watch_log_for("Adding state DISK_USAGE: FULL", filename='debug.log', from_mark=mark, timeout=10) + + # verify node2 will reject writes if node1 is the replica + session2 = self.patient_exclusive_cql_connection(node2, keyspace="ks") + rows = 100 + failed = 0 + for x in range(rows): + try: + session2.execute("INSERT INTO t(id, v) VALUES({v}, {v})".format(v=x)) + except InvalidRequest as e: + assert re.search("Write request failed because disk usage exceeds failure threshold", str(e)) + failed = failed + 1 + + assert rows != failed, "Expect node2 rejects some writes, but rejected all" + assert 0 != failed, "Expect node2 rejects some writes, but rejected nothing" + assert_one(session2, "SELECT COUNT(*) FROM t", [rows - failed]) + + logger.debug("Inject STUFFED to node1, node2 should warn client") + session2.execute("TRUNCATE t") + mark = node1.mark_log() + self.disk_usage_injection(node1, "stuffed") + node1.watch_log_for("Adding state DISK_USAGE: STUFFED", filename='debug.log', from_mark=mark, timeout=10) + + warnings = 0 + for x in range(rows): + fut = session2.execute_async("INSERT INTO t(id, v) VALUES({v}, {v})".format(v=x)) + fut.result() + if fut.warnings: + assert ["Replica disk usage exceeds warn threshold"] == fut.warnings + warnings = warnings + 1 + + assert rows != warnings,"Expect node2 emits some warnings, but got all warnings" + assert 0 != warnings,"Expect node2 emits some warnings, but got no warnings" + assert_one(session2, "SELECT COUNT(*) FROM t", [rows]) + + session.cluster.shutdown() + session2.cluster.shutdown() + + def disk_usage_injection(self, node, state, clear_byteman=True): + if clear_byteman: + node.byteman_submit(['-u']) + node.byteman_submit(["./byteman/guardrails/disk_usage_{}.btm".format(state)]) diff --git a/paging_test.py b/paging_test.py index 971c7778..e6554b85 100644 --- a/paging_test.py +++ b/paging_test.py @@ -18,6 +18,7 @@ from tools.assertions import (assert_all, assert_invalid, assert_length_equal, assert_one, assert_lists_equal_ignoring_order) from tools.data import rows_to_list from tools.datahelp import create_rows, flatten_into_set, parse_data_into_dicts +from tools.misc import restart_cluster_and_update_config from tools.paging import PageAssertionMixin, PageFetcher since = pytest.mark.since @@ -3423,19 +3424,26 @@ class TestPagingWithDeletions(BasePagingTester, PageAssertionMixin): supports_v5_protocol = self.supports_v5_protocol(self.cluster.version()) self.fixture_dtest_setup.allow_log_errors = True - self.cluster.set_configuration_options( - values={'tombstone_failure_threshold': 500} - ) + if self.supports_guardrails: + config_opts = {'guardrails': {'tombstone_failure_threshold': 500, + 'tombstone_warn_threshold': -1, + 'write_consistency_levels_disallowed': {}}} + else: + config_opts = {'tombstone_failure_threshold': 500} + restart_cluster_and_update_config(self.cluster, config_opts) self.session = self.prepare() self.setup_data() - # Add more data + if self.supports_guardrails: + # cell tombstones are not counted towards the threshold, so we delete rows + query = "delete from paging_test where id = 1 and mytext = '{}'" + else: + # Add more data + query = "insert into paging_test (id, mytext, col1) values (1, '{}', null)" + values = [uuid.uuid4() for i in range(3000)] for value in values: - self.session.execute(SimpleStatement( - "insert into paging_test (id, mytext, col1) values (1, '{}', null) ".format( - value - ), + self.session.execute(SimpleStatement(query.format(value), consistency_level=CL.ALL )) @@ -3456,7 +3464,7 @@ class TestPagingWithDeletions(BasePagingTester, PageAssertionMixin): failure_msg = ("Scanned over.* tombstones in test_paging_size." "paging_test.* query aborted") else: - failure_msg = ("Scanned over.* tombstones during query.* query aborted") + failure_msg = ("Scanned over.* (tombstones|tombstone rows) during query.* query aborted") self.cluster.wait_for_any_log(failure_msg, 25) diff --git a/pushed_notifications_test.py b/pushed_notifications_test.py index 581b0ea5..70b42ffe 100644 --- a/pushed_notifications_test.py +++ b/pushed_notifications_test.py @@ -387,11 +387,16 @@ class TestVariousNotifications(Tester): have_v5_protocol = self.supports_v5_protocol(self.cluster.version()) self.fixture_dtest_setup.allow_log_errors = True - opts={ - 'tombstone_failure_threshold': 500, - 'read_request_timeout_in_ms': 30000, # 30 seconds - 'range_request_timeout_in_ms': 40000 - } + opts={} + if self.supports_guardrails: + opts = {'guardrails': {'tombstone_warn_threshold': -1, + 'tombstone_failure_threshold': 500}, + 'read_request_timeout_in_ms': 30000, # 30 seconds + 'range_request_timeout_in_ms': 40000} + else: + opts = {'tombstone_failure_threshold': 500, + 'read_request_timeout_in_ms': 30000, # 30 seconds + 'range_request_timeout_in_ms': 40000} if self.cluster.version() >= LooseVersion('4.1'): opts['native_transport_timeout'] = '30s' self.cluster.set_configuration_options(values=opts) @@ -407,17 +412,17 @@ class TestVariousNotifications(Tester): "PRIMARY KEY (id, mytext) )" ) - # Add data with tombstones + if self.supports_guardrails: + # cell tombstones are not counted towards the threshold, so we delete rows + query = "delete from test where id = 1 and mytext = '{}'" + else: + # Add data with tombstones + query = "insert into test (id, mytext, col1) values (1, '{}', null)" values = [str(i) for i in range(1000)] for value in values: - session.execute(SimpleStatement( - "insert into test (id, mytext, col1) values (1, '{}', null) ".format( - value - ), - consistency_level=CL.ALL - )) - - failure_msg = ("Scanned over.* tombstones.* query aborted") + session.execute(SimpleStatement(query.format(value),consistency_level=CL.ALL)) + + failure_msg = ("Scanned over.* (tombstones|tombstone rows).* query aborted") @pytest.mark.timeout(25) def read_failure_query(): diff --git a/read_failures_test.py b/read_failures_test.py index 475f2781..664ca70f 100644 --- a/read_failures_test.py +++ b/read_failures_test.py @@ -4,6 +4,7 @@ import pytest from cassandra import ConsistencyLevel, ReadFailure, ReadTimeout from cassandra.policies import FallthroughRetryPolicy from cassandra.query import SimpleStatement +from distutils.version import LooseVersion from dtest import Tester @@ -21,7 +22,9 @@ class TestReadFailures(Tester): @pytest.fixture(autouse=True) def fixture_add_additional_log_patterns(self, fixture_dtest_setup): fixture_dtest_setup.ignore_log_patterns = ( - "Scanned over [1-9][0-9]* tombstones", # This is expected when testing read failures due to tombstones + # These are expected when testing read failures due to tombstones, + "Scanned over [1-9][0-9]* tombstones", + "Scanned over [1-9][0-9]* tombstone rows", ) return fixture_dtest_setup @@ -33,9 +36,15 @@ class TestReadFailures(Tester): self.expected_expt = ReadFailure def _prepare_cluster(self): - self.cluster.set_configuration_options( - values={'tombstone_failure_threshold': self.tombstone_failure_threshold} - ) + if self.supports_guardrails: + self.cluster.set_configuration_options( + values={'guardrails': {'tombstone_warn_threshold': -1, + 'tombstone_failure_threshold': self.tombstone_failure_threshold}} + ) + else: + self.cluster.set_configuration_options( + values={'tombstone_failure_threshold': self.tombstone_failure_threshold} + ) self.cluster.populate(3) self.cluster.start() self.nodes = list(self.cluster.nodes.values()) diff --git a/tools/misc.py b/tools/misc.py index 2f43d9df..b754b5e9 100644 --- a/tools/misc.py +++ b/tools/misc.py @@ -160,3 +160,14 @@ def add_skip(cls, reason=""): else: cls.pytestmark = [pytest.mark.skip(reason)] return cls + + +def restart_cluster_and_update_config(cluster, config): + """ + Takes a new config, and applies it to a cluster. We need to restart + for it to take effect. We _could_ take a node here, but we don't want to. + If you really want to change the config of just one node, use JMX. + """ + cluster.stop() + cluster.set_configuration_options(values=config) + cluster.start() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
