[
https://issues.apache.org/jira/browse/CASSANDRA-18160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Michael Semb Wever updated CASSANDRA-18160:
-------------------------------------------
Fix Version/s: 4.1.x
4.x
> Test Failure:
> cdc_test.TestCDC.test_insertion_and_commitlog_behavior_after_reaching_cdc_total_space
> ---------------------------------------------------------------------------------------------------
>
> Key: CASSANDRA-18160
> URL: https://issues.apache.org/jira/browse/CASSANDRA-18160
> Project: Cassandra
> Issue Type: Bug
> Components: Test/dtest/python
> Reporter: Michael Semb Wever
> Priority: Normal
> Fix For: 4.1.x, 4.x
>
>
> Flaky.
> {noformat}
> self = <cdc_test.TestCDC object at 0x7fe8b2e01f10>
> def
> test_insertion_and_commitlog_behavior_after_reaching_cdc_total_space(self):
> """
> Test that C* behaves correctly when CDC tables have consumed all
> the
> space available to them. In particular: after writing
> cdc_total_space_in_mb MB into CDC commitlogs:
> - CDC writes are rejected
> - non-CDC writes are accepted
> - on flush, CDC commitlogs are copied to cdc_raw
> - on flush, non-CDC commitlogs are not copied to cdc_raw
> This is a lot of behavior to validate in one test, but we do so to
> avoid running multiple tests that each write 1MB of data to fill
> cdc_total_space_in_mb.
> """
> ks_name = 'ks'
> full_cdc_table_info = TableInfo(
> ks_name=ks_name, table_name='full_cdc_tab',
> column_spec=_16_uuid_column_spec,
> insert_stmt=_get_16_uuid_insert_stmt(ks_name, 'full_cdc_tab'),
> options={'cdc': 'true'}
> )
>
> configuration_overrides = {
> # Make CDC space as small as possible so we can fill it quickly.
> 'cdc_total_space_in_mb': 4,
> }
> node, session = self.prepare(
> ks_name=ks_name,
> configuration_overrides=configuration_overrides
> )
> session.execute(full_cdc_table_info.create_stmt)
>
> # Later, we'll also make assertions about the behavior of non-CDC
> # tables, so we create one here.
> non_cdc_table_info = TableInfo(
> ks_name=ks_name, table_name='non_cdc_tab',
> column_spec=_16_uuid_column_spec,
> insert_stmt=_get_16_uuid_insert_stmt(ks_name, 'non_cdc_tab')
> )
> session.execute(non_cdc_table_info.create_stmt)
> # We'll also make assertions about the behavior of CDC tables when
> # other CDC tables have already filled the designated space for CDC
> # commitlogs, so we create the second CDC table here.
> empty_cdc_table_info = TableInfo(
> ks_name=ks_name, table_name='empty_cdc_tab',
> column_spec=_16_uuid_column_spec,
> insert_stmt=_get_16_uuid_insert_stmt(ks_name, 'empty_cdc_tab'),
> options={'cdc': 'true'}
> )
> session.execute(empty_cdc_table_info.create_stmt)
>
> # Here, we insert values into the first CDC table until we get a
> # WriteFailure. This should happen when the CDC commitlogs take up 1MB
> # or more.
> logger.debug('flushing non-CDC commitlogs')
> node.flush()
> # Then, we insert rows into the CDC table until we can't anymore.
> logger.debug('beginning data insert to fill CDC commitlogs')
> rows_loaded = _write_to_cdc_write_failure(session,
> full_cdc_table_info.insert_stmt)
>
> assert 0 < rows_loaded, ('No CDC rows inserted. This may happen when '
> 'cdc_total_space_in_mb >
> commitlog_segment_size_in_mb')
>
> commitlog_dir = os.path.join(node.get_path(), 'commitlogs')
> commitlogs_size = size_of_files_in_dir(commitlog_dir)
> logger.debug('Commitlog dir ({d}) is {b}B'.format(d=commitlog_dir,
> b=commitlogs_size))
>
> # We should get a WriteFailure when trying to write to the CDC table
> # that's filled the designated CDC space...
> try:
> session.execute(full_cdc_table_info.insert_stmt)
> raise Exception("WriteFailure expected")
> except WriteFailure:
> pass
> # or any CDC table.
> try:
> session.execute(empty_cdc_table_info.insert_stmt)
> raise Exception("WriteFailure expected")
> except WriteFailure:
> pass
>
> # Now we test for behaviors of non-CDC tables when we've exceeded
> # cdc_total_space_in_mb.
> #
> # First, we drain and save the names of all the new discarded CDC
> # segments
> node.drain()
> session.cluster.shutdown()
> node.stop()
> node.start(wait_for_binary_proto=True)
> session = self.patient_cql_connection(node)
> pre_non_cdc_write_cdc_raw_segments =
> _get_cdc_raw_files(node.get_path())
>
> # Snapshot the _cdc.idx file if > 4.0 for comparison at end
> before_cdc_state = [] # init empty here to quiet PEP
> if self.cluster.version() >= '4.0':
> # Create ReplayData objects for each index file found in loading
> cluster
> node1_path = os.path.join(node.get_path(), 'cdc_raw')
> before_cdc_state = [ReplayData.load(node1_path, name)
> for name in os.listdir(node1_path) if
> name.endswith('_cdc.idx')]
>
> # save the names of all the commitlog segments written up to this
> # point:
> pre_non_cdc_write_segments = _get_commitlog_files(node.get_path())
>
> # Check that writing to non-CDC tables succeeds even when writes to
> CDC
> # tables are rejected:
> non_cdc_prepared_insert =
> session.prepare(non_cdc_table_info.insert_stmt)
> session.execute(non_cdc_prepared_insert, ()) # should not raise an
> exception
>
> # Check the following property: any new commitlog segments written to
> # after cdc_raw has reached its maximum configured size should not be
> # moved to cdc_raw, on commitlog discard, because any such commitlog
> # segments are written to non-CDC tables.
> #
> # First, write to non-cdc tables.
> start, time_limit = time.time(), 600
> rate_limited_debug = get_rate_limited_function(logger.debug, 5)
> logger.debug('writing to non-cdc table')
> # We write until we get a new commitlog segment.
> while _get_commitlog_files(node.get_path()) <=
> pre_non_cdc_write_segments:
> elapsed = time.time() - start
> rate_limited_debug(' non-cdc load step has lasted
> {s:.2f}s'.format(s=elapsed))
> assert elapsed <= time_limit, "It's been over a {s}s and we
> haven't written a new " \
> "commitlog segment. Something is
> wrong.".format(s=time_limit)
> execute_concurrent(
> session,
> ((non_cdc_prepared_insert, ()) for _ in range(1000)),
> concurrency=500,
> raise_on_first_error=True,
> )
>
> # Finally, we check that draining doesn't move any new segments to
> cdc_raw:
> node.drain()
> session.cluster.shutdown()
>
> if self.cluster.version() < '4.0':
> assert pre_non_cdc_write_cdc_raw_segments ==
> _get_cdc_raw_files(node.get_path())
> else:
> # Create ReplayData objects for each index file found in loading
> cluster
> node2_path = os.path.join(node.get_path(), 'cdc_raw')
> after_cdc_state = [ReplayData.load(node2_path, name)
> for name in os.listdir(node2_path) if
> name.endswith('_cdc.idx')]
>
> # Confirm all indexes in 1st are accounted for and match
> corresponding entry in 2nd.
> found = True
> for idx in before_cdc_state:
> idx_found = False
> for idx_two in after_cdc_state:
> if compare_replay_data(idx, idx_two):
> idx_found = True
> if not idx_found:
> found = False
> break
> if not found:
> self._fail_and_print_sets(before_cdc_state, after_cdc_state,
> 'Found CDC index in before not
> matched in after (non-CDC write test)')
>
> # Now we confirm we don't have anything that showed up in 2nd not
> accounted for in 1st
> orphan_found = False
> for idx_two in after_cdc_state:
> index_found = False
> for idx in before_cdc_state:
> if compare_replay_data(idx_two, idx):
> index_found = True
> if not index_found:
> orphan_found = True
> break
> if orphan_found:
> > self._fail_and_print_sets(before_cdc_state, after_cdc_state,
> 'Found orphaned index file in after
> CDC state not in former.')
> cdc_test.py:523:
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> _
> self = <cdc_test.TestCDC object at 0x7fe8b2e01f10>
> rd_one = [ReplayData(idx_name='CommitLog-7-1673574594100_cdc.idx',
> completed='COMPLETED', offset=2097119,
> log_name='CommitLog-7...='CommitLog-7-1673574594101_cdc.idx',
> completed='COMPLETED', offset=2097076,
> log_name='CommitLog-7-1673574594101.log')]
> rd_two = [ReplayData(idx_name='CommitLog-7-1673574594100_cdc.idx',
> completed='COMPLETED', offset=2097119,
> log_name='CommitLog-7...me='CommitLog-7-1673574650056_cdc.idx',
> completed='COMPLETED', offset=32230,
> log_name='CommitLog-7-1673574650056.log')]
> msg = 'Found orphaned index file in after CDC state not in former.'
> def _fail_and_print_sets(self, rd_one, rd_two, msg):
> print('Set One:')
> for idx in rd_one:
> > print(' {},{},{},{}'.format(idx.name, idx.completed,
> > idx.offset, idx.log_name))
> E AttributeError: 'ReplayData' object has no attribute 'name'
> cdc_test.py:529: AttributeError
> {noformat}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]