Michael Semb Wever created CASSANDRA-18160:
----------------------------------------------
Summary: Test Failure:
cdc_test.TestCDC.test_insertion_and_commitlog_behavior_after_reaching_cdc_total_space
Key: CASSANDRA-18160
URL: https://issues.apache.org/jira/browse/CASSANDRA-18160
Project: Cassandra
Issue Type: Bug
Components: Test/dtest/python
Reporter: Michael Semb Wever
Flaky.
{noformat}
self = <cdc_test.TestCDC object at 0x7fe8b2e01f10>
def
test_insertion_and_commitlog_behavior_after_reaching_cdc_total_space(self):
"""
Test that C* behaves correctly when CDC tables have consumed all the
space available to them. In particular: after writing
cdc_total_space_in_mb MB into CDC commitlogs:
- CDC writes are rejected
- non-CDC writes are accepted
- on flush, CDC commitlogs are copied to cdc_raw
- on flush, non-CDC commitlogs are not copied to cdc_raw
This is a lot of behavior to validate in one test, but we do so to
avoid running multiple tests that each write 1MB of data to fill
cdc_total_space_in_mb.
"""
ks_name = 'ks'
full_cdc_table_info = TableInfo(
ks_name=ks_name, table_name='full_cdc_tab',
column_spec=_16_uuid_column_spec,
insert_stmt=_get_16_uuid_insert_stmt(ks_name, 'full_cdc_tab'),
options={'cdc': 'true'}
)
configuration_overrides = {
# Make CDC space as small as possible so we can fill it quickly.
'cdc_total_space_in_mb': 4,
}
node, session = self.prepare(
ks_name=ks_name,
configuration_overrides=configuration_overrides
)
session.execute(full_cdc_table_info.create_stmt)
# Later, we'll also make assertions about the behavior of non-CDC
# tables, so we create one here.
non_cdc_table_info = TableInfo(
ks_name=ks_name, table_name='non_cdc_tab',
column_spec=_16_uuid_column_spec,
insert_stmt=_get_16_uuid_insert_stmt(ks_name, 'non_cdc_tab')
)
session.execute(non_cdc_table_info.create_stmt)
# We'll also make assertions about the behavior of CDC tables when
# other CDC tables have already filled the designated space for CDC
# commitlogs, so we create the second CDC table here.
empty_cdc_table_info = TableInfo(
ks_name=ks_name, table_name='empty_cdc_tab',
column_spec=_16_uuid_column_spec,
insert_stmt=_get_16_uuid_insert_stmt(ks_name, 'empty_cdc_tab'),
options={'cdc': 'true'}
)
session.execute(empty_cdc_table_info.create_stmt)
# Here, we insert values into the first CDC table until we get a
# WriteFailure. This should happen when the CDC commitlogs take up 1MB
# or more.
logger.debug('flushing non-CDC commitlogs')
node.flush()
# Then, we insert rows into the CDC table until we can't anymore.
logger.debug('beginning data insert to fill CDC commitlogs')
rows_loaded = _write_to_cdc_write_failure(session,
full_cdc_table_info.insert_stmt)
assert 0 < rows_loaded, ('No CDC rows inserted. This may happen when '
'cdc_total_space_in_mb >
commitlog_segment_size_in_mb')
commitlog_dir = os.path.join(node.get_path(), 'commitlogs')
commitlogs_size = size_of_files_in_dir(commitlog_dir)
logger.debug('Commitlog dir ({d}) is {b}B'.format(d=commitlog_dir,
b=commitlogs_size))
# We should get a WriteFailure when trying to write to the CDC table
# that's filled the designated CDC space...
try:
session.execute(full_cdc_table_info.insert_stmt)
raise Exception("WriteFailure expected")
except WriteFailure:
pass
# or any CDC table.
try:
session.execute(empty_cdc_table_info.insert_stmt)
raise Exception("WriteFailure expected")
except WriteFailure:
pass
# Now we test for behaviors of non-CDC tables when we've exceeded
# cdc_total_space_in_mb.
#
# First, we drain and save the names of all the new discarded CDC
# segments
node.drain()
session.cluster.shutdown()
node.stop()
node.start(wait_for_binary_proto=True)
session = self.patient_cql_connection(node)
pre_non_cdc_write_cdc_raw_segments = _get_cdc_raw_files(node.get_path())
# Snapshot the _cdc.idx file if > 4.0 for comparison at end
before_cdc_state = [] # init empty here to quiet PEP
if self.cluster.version() >= '4.0':
# Create ReplayData objects for each index file found in loading
cluster
node1_path = os.path.join(node.get_path(), 'cdc_raw')
before_cdc_state = [ReplayData.load(node1_path, name)
for name in os.listdir(node1_path) if
name.endswith('_cdc.idx')]
# save the names of all the commitlog segments written up to this
# point:
pre_non_cdc_write_segments = _get_commitlog_files(node.get_path())
# Check that writing to non-CDC tables succeeds even when writes to CDC
# tables are rejected:
non_cdc_prepared_insert =
session.prepare(non_cdc_table_info.insert_stmt)
session.execute(non_cdc_prepared_insert, ()) # should not raise an
exception
# Check the following property: any new commitlog segments written to
# after cdc_raw has reached its maximum configured size should not be
# moved to cdc_raw, on commitlog discard, because any such commitlog
# segments are written to non-CDC tables.
#
# First, write to non-cdc tables.
start, time_limit = time.time(), 600
rate_limited_debug = get_rate_limited_function(logger.debug, 5)
logger.debug('writing to non-cdc table')
# We write until we get a new commitlog segment.
while _get_commitlog_files(node.get_path()) <=
pre_non_cdc_write_segments:
elapsed = time.time() - start
rate_limited_debug(' non-cdc load step has lasted
{s:.2f}s'.format(s=elapsed))
assert elapsed <= time_limit, "It's been over a {s}s and we haven't
written a new " \
"commitlog segment. Something is
wrong.".format(s=time_limit)
execute_concurrent(
session,
((non_cdc_prepared_insert, ()) for _ in range(1000)),
concurrency=500,
raise_on_first_error=True,
)
# Finally, we check that draining doesn't move any new segments to
cdc_raw:
node.drain()
session.cluster.shutdown()
if self.cluster.version() < '4.0':
assert pre_non_cdc_write_cdc_raw_segments ==
_get_cdc_raw_files(node.get_path())
else:
# Create ReplayData objects for each index file found in loading
cluster
node2_path = os.path.join(node.get_path(), 'cdc_raw')
after_cdc_state = [ReplayData.load(node2_path, name)
for name in os.listdir(node2_path) if
name.endswith('_cdc.idx')]
# Confirm all indexes in 1st are accounted for and match
corresponding entry in 2nd.
found = True
for idx in before_cdc_state:
idx_found = False
for idx_two in after_cdc_state:
if compare_replay_data(idx, idx_two):
idx_found = True
if not idx_found:
found = False
break
if not found:
self._fail_and_print_sets(before_cdc_state, after_cdc_state,
'Found CDC index in before not
matched in after (non-CDC write test)')
# Now we confirm we don't have anything that showed up in 2nd not
accounted for in 1st
orphan_found = False
for idx_two in after_cdc_state:
index_found = False
for idx in before_cdc_state:
if compare_replay_data(idx_two, idx):
index_found = True
if not index_found:
orphan_found = True
break
if orphan_found:
> self._fail_and_print_sets(before_cdc_state, after_cdc_state,
'Found orphaned index file in after
CDC state not in former.')
cdc_test.py:523:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <cdc_test.TestCDC object at 0x7fe8b2e01f10>
rd_one = [ReplayData(idx_name='CommitLog-7-1673574594100_cdc.idx',
completed='COMPLETED', offset=2097119,
log_name='CommitLog-7...='CommitLog-7-1673574594101_cdc.idx',
completed='COMPLETED', offset=2097076,
log_name='CommitLog-7-1673574594101.log')]
rd_two = [ReplayData(idx_name='CommitLog-7-1673574594100_cdc.idx',
completed='COMPLETED', offset=2097119,
log_name='CommitLog-7...me='CommitLog-7-1673574650056_cdc.idx',
completed='COMPLETED', offset=32230, log_name='CommitLog-7-1673574650056.log')]
msg = 'Found orphaned index file in after CDC state not in former.'
def _fail_and_print_sets(self, rd_one, rd_two, msg):
print('Set One:')
for idx in rd_one:
> print(' {},{},{},{}'.format(idx.name, idx.completed, idx.offset,
> idx.log_name))
E AttributeError: 'ReplayData' object has no attribute 'name'
cdc_test.py:529: AttributeError
{noformat}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]