[ 
https://issues.apache.org/jira/browse/CASSANDRA-18160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Semb Wever updated CASSANDRA-18160:
-------------------------------------------
    Fix Version/s: 4.1.x
                   4.x

> Test Failure: 
> cdc_test.TestCDC.test_insertion_and_commitlog_behavior_after_reaching_cdc_total_space
> ---------------------------------------------------------------------------------------------------
>
>                 Key: CASSANDRA-18160
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-18160
>             Project: Cassandra
>          Issue Type: Bug
>          Components: Test/dtest/python
>            Reporter: Michael Semb Wever
>            Priority: Normal
>             Fix For: 4.1.x, 4.x
>
>
> Flaky.
> {noformat}
> self = <cdc_test.TestCDC object at 0x7fe8b2e01f10>
>     def 
> test_insertion_and_commitlog_behavior_after_reaching_cdc_total_space(self):
>         """
>             Test that C* behaves correctly when CDC tables have consumed all 
> the
>             space available to them. In particular: after writing
>             cdc_total_space_in_mb MB into CDC commitlogs:
>             - CDC writes are rejected
>             - non-CDC writes are accepted
>             - on flush, CDC commitlogs are copied to cdc_raw
>             - on flush, non-CDC commitlogs are not copied to cdc_raw
>             This is a lot of behavior to validate in one test, but we do so to
>             avoid running multiple tests that each write 1MB of data to fill
>             cdc_total_space_in_mb.
>             """
>         ks_name = 'ks'
>         full_cdc_table_info = TableInfo(
>             ks_name=ks_name, table_name='full_cdc_tab',
>             column_spec=_16_uuid_column_spec,
>             insert_stmt=_get_16_uuid_insert_stmt(ks_name, 'full_cdc_tab'),
>             options={'cdc': 'true'}
>         )
>     
>         configuration_overrides = {
>             # Make CDC space as small as possible so we can fill it quickly.
>             'cdc_total_space_in_mb': 4,
>         }
>         node, session = self.prepare(
>             ks_name=ks_name,
>             configuration_overrides=configuration_overrides
>         )
>         session.execute(full_cdc_table_info.create_stmt)
>     
>         # Later, we'll also make assertions about the behavior of non-CDC
>         # tables, so we create one here.
>         non_cdc_table_info = TableInfo(
>             ks_name=ks_name, table_name='non_cdc_tab',
>             column_spec=_16_uuid_column_spec,
>             insert_stmt=_get_16_uuid_insert_stmt(ks_name, 'non_cdc_tab')
>         )
>         session.execute(non_cdc_table_info.create_stmt)
>         # We'll also make assertions about the behavior of CDC tables when
>         # other CDC tables have already filled the designated space for CDC
>         # commitlogs, so we create the second CDC table here.
>         empty_cdc_table_info = TableInfo(
>             ks_name=ks_name, table_name='empty_cdc_tab',
>             column_spec=_16_uuid_column_spec,
>             insert_stmt=_get_16_uuid_insert_stmt(ks_name, 'empty_cdc_tab'),
>             options={'cdc': 'true'}
>         )
>         session.execute(empty_cdc_table_info.create_stmt)
>     
>         # Here, we insert values into the first CDC table until we get a
>         # WriteFailure. This should happen when the CDC commitlogs take up 1MB
>         # or more.
>         logger.debug('flushing non-CDC commitlogs')
>         node.flush()
>         # Then, we insert rows into the CDC table until we can't anymore.
>         logger.debug('beginning data insert to fill CDC commitlogs')
>         rows_loaded = _write_to_cdc_write_failure(session, 
> full_cdc_table_info.insert_stmt)
>     
>         assert 0 < rows_loaded, ('No CDC rows inserted. This may happen when '
>                                  'cdc_total_space_in_mb > 
> commitlog_segment_size_in_mb')
>     
>         commitlog_dir = os.path.join(node.get_path(), 'commitlogs')
>         commitlogs_size = size_of_files_in_dir(commitlog_dir)
>         logger.debug('Commitlog dir ({d}) is {b}B'.format(d=commitlog_dir, 
> b=commitlogs_size))
>     
>         # We should get a WriteFailure when trying to write to the CDC table
>         # that's filled the designated CDC space...
>         try:
>             session.execute(full_cdc_table_info.insert_stmt)
>             raise Exception("WriteFailure expected")
>         except WriteFailure:
>             pass
>         # or any CDC table.
>         try:
>             session.execute(empty_cdc_table_info.insert_stmt)
>             raise Exception("WriteFailure expected")
>         except WriteFailure:
>             pass
>     
>         # Now we test for behaviors of non-CDC tables when we've exceeded
>         # cdc_total_space_in_mb.
>         #
>         # First, we drain and save the names of all the new discarded CDC
>         # segments
>         node.drain()
>         session.cluster.shutdown()
>         node.stop()
>         node.start(wait_for_binary_proto=True)
>         session = self.patient_cql_connection(node)
>         pre_non_cdc_write_cdc_raw_segments = 
> _get_cdc_raw_files(node.get_path())
>     
>         # Snapshot the _cdc.idx file if > 4.0 for comparison at end
>         before_cdc_state = []  # init empty here to quiet PEP
>         if self.cluster.version() >= '4.0':
>             # Create ReplayData objects for each index file found in loading 
> cluster
>             node1_path = os.path.join(node.get_path(), 'cdc_raw')
>             before_cdc_state = [ReplayData.load(node1_path, name)
>                                 for name in os.listdir(node1_path) if 
> name.endswith('_cdc.idx')]
>     
>         # save the names of all the commitlog segments written up to this
>         # point:
>         pre_non_cdc_write_segments = _get_commitlog_files(node.get_path())
>     
>         # Check that writing to non-CDC tables succeeds even when writes to 
> CDC
>         # tables are rejected:
>         non_cdc_prepared_insert = 
> session.prepare(non_cdc_table_info.insert_stmt)
>         session.execute(non_cdc_prepared_insert, ())  # should not raise an 
> exception
>     
>         # Check the following property: any new commitlog segments written to
>         # after cdc_raw has reached its maximum configured size should not be
>         # moved to cdc_raw, on commitlog discard, because any such commitlog
>         # segments are written to non-CDC tables.
>         #
>         # First, write to non-cdc tables.
>         start, time_limit = time.time(), 600
>         rate_limited_debug = get_rate_limited_function(logger.debug, 5)
>         logger.debug('writing to non-cdc table')
>         # We write until we get a new commitlog segment.
>         while _get_commitlog_files(node.get_path()) <= 
> pre_non_cdc_write_segments:
>             elapsed = time.time() - start
>             rate_limited_debug('  non-cdc load step has lasted 
> {s:.2f}s'.format(s=elapsed))
>             assert elapsed <= time_limit, "It's been over a {s}s and we 
> haven't written a new " \
>                                            "commitlog segment. Something is 
> wrong.".format(s=time_limit)
>             execute_concurrent(
>                 session,
>                 ((non_cdc_prepared_insert, ()) for _ in range(1000)),
>                 concurrency=500,
>                 raise_on_first_error=True,
>             )
>     
>         # Finally, we check that draining doesn't move any new segments to 
> cdc_raw:
>         node.drain()
>         session.cluster.shutdown()
>     
>         if self.cluster.version() < '4.0':
>             assert pre_non_cdc_write_cdc_raw_segments == 
> _get_cdc_raw_files(node.get_path())
>         else:
>             # Create ReplayData objects for each index file found in loading 
> cluster
>             node2_path = os.path.join(node.get_path(), 'cdc_raw')
>             after_cdc_state = [ReplayData.load(node2_path, name)
>                                for name in os.listdir(node2_path) if 
> name.endswith('_cdc.idx')]
>     
>             # Confirm all indexes in 1st are accounted for and match 
> corresponding entry in 2nd.
>             found = True
>             for idx in before_cdc_state:
>                 idx_found = False
>                 for idx_two in after_cdc_state:
>                     if compare_replay_data(idx, idx_two):
>                         idx_found = True
>                 if not idx_found:
>                     found = False
>                     break
>             if not found:
>                 self._fail_and_print_sets(before_cdc_state, after_cdc_state,
>                                           'Found CDC index in before not 
> matched in after (non-CDC write test)')
>     
>             # Now we confirm we don't have anything that showed up in 2nd not 
> accounted for in 1st
>             orphan_found = False
>             for idx_two in after_cdc_state:
>                 index_found = False
>                 for idx in before_cdc_state:
>                     if compare_replay_data(idx_two, idx):
>                         index_found = True
>                 if not index_found:
>                     orphan_found = True
>                     break
>             if orphan_found:
> >               self._fail_and_print_sets(before_cdc_state, after_cdc_state,
>                                           'Found orphaned index file in after 
> CDC state not in former.')
> cdc_test.py:523: 
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> _ 
> self = <cdc_test.TestCDC object at 0x7fe8b2e01f10>
> rd_one = [ReplayData(idx_name='CommitLog-7-1673574594100_cdc.idx', 
> completed='COMPLETED', offset=2097119, 
> log_name='CommitLog-7...='CommitLog-7-1673574594101_cdc.idx', 
> completed='COMPLETED', offset=2097076, 
> log_name='CommitLog-7-1673574594101.log')]
> rd_two = [ReplayData(idx_name='CommitLog-7-1673574594100_cdc.idx', 
> completed='COMPLETED', offset=2097119, 
> log_name='CommitLog-7...me='CommitLog-7-1673574650056_cdc.idx', 
> completed='COMPLETED', offset=32230, 
> log_name='CommitLog-7-1673574650056.log')]
> msg = 'Found orphaned index file in after CDC state not in former.'
>     def _fail_and_print_sets(self, rd_one, rd_two, msg):
>         print('Set One:')
>         for idx in rd_one:
> >           print('   {},{},{},{}'.format(idx.name, idx.completed, 
> > idx.offset, idx.log_name))
> E           AttributeError: 'ReplayData' object has no attribute 'name'
> cdc_test.py:529: AttributeError
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to