Michael Semb Wever created CASSANDRA-18160:
----------------------------------------------

             Summary: Test Failure: 
cdc_test.TestCDC.test_insertion_and_commitlog_behavior_after_reaching_cdc_total_space
                 Key: CASSANDRA-18160
                 URL: https://issues.apache.org/jira/browse/CASSANDRA-18160
             Project: Cassandra
          Issue Type: Bug
          Components: Test/dtest/python
            Reporter: Michael Semb Wever


Flaky.

{noformat}
self = <cdc_test.TestCDC object at 0x7fe8b2e01f10>

    def 
test_insertion_and_commitlog_behavior_after_reaching_cdc_total_space(self):
        """
            Test that C* behaves correctly when CDC tables have consumed all the
            space available to them. In particular: after writing
            cdc_total_space_in_mb MB into CDC commitlogs:
            - CDC writes are rejected
            - non-CDC writes are accepted
            - on flush, CDC commitlogs are copied to cdc_raw
            - on flush, non-CDC commitlogs are not copied to cdc_raw
            This is a lot of behavior to validate in one test, but we do so to
            avoid running multiple tests that each write 1MB of data to fill
            cdc_total_space_in_mb.
            """
        ks_name = 'ks'
        full_cdc_table_info = TableInfo(
            ks_name=ks_name, table_name='full_cdc_tab',
            column_spec=_16_uuid_column_spec,
            insert_stmt=_get_16_uuid_insert_stmt(ks_name, 'full_cdc_tab'),
            options={'cdc': 'true'}
        )
    
        configuration_overrides = {
            # Make CDC space as small as possible so we can fill it quickly.
            'cdc_total_space_in_mb': 4,
        }
        node, session = self.prepare(
            ks_name=ks_name,
            configuration_overrides=configuration_overrides
        )
        session.execute(full_cdc_table_info.create_stmt)
    
        # Later, we'll also make assertions about the behavior of non-CDC
        # tables, so we create one here.
        non_cdc_table_info = TableInfo(
            ks_name=ks_name, table_name='non_cdc_tab',
            column_spec=_16_uuid_column_spec,
            insert_stmt=_get_16_uuid_insert_stmt(ks_name, 'non_cdc_tab')
        )
        session.execute(non_cdc_table_info.create_stmt)
        # We'll also make assertions about the behavior of CDC tables when
        # other CDC tables have already filled the designated space for CDC
        # commitlogs, so we create the second CDC table here.
        empty_cdc_table_info = TableInfo(
            ks_name=ks_name, table_name='empty_cdc_tab',
            column_spec=_16_uuid_column_spec,
            insert_stmt=_get_16_uuid_insert_stmt(ks_name, 'empty_cdc_tab'),
            options={'cdc': 'true'}
        )
        session.execute(empty_cdc_table_info.create_stmt)
    
        # Here, we insert values into the first CDC table until we get a
        # WriteFailure. This should happen when the CDC commitlogs take up 1MB
        # or more.
        logger.debug('flushing non-CDC commitlogs')
        node.flush()
        # Then, we insert rows into the CDC table until we can't anymore.
        logger.debug('beginning data insert to fill CDC commitlogs')
        rows_loaded = _write_to_cdc_write_failure(session, 
full_cdc_table_info.insert_stmt)
    
        assert 0 < rows_loaded, ('No CDC rows inserted. This may happen when '
                                 'cdc_total_space_in_mb > 
commitlog_segment_size_in_mb')
    
        commitlog_dir = os.path.join(node.get_path(), 'commitlogs')
        commitlogs_size = size_of_files_in_dir(commitlog_dir)
        logger.debug('Commitlog dir ({d}) is {b}B'.format(d=commitlog_dir, 
b=commitlogs_size))
    
        # We should get a WriteFailure when trying to write to the CDC table
        # that's filled the designated CDC space...
        try:
            session.execute(full_cdc_table_info.insert_stmt)
            raise Exception("WriteFailure expected")
        except WriteFailure:
            pass
        # or any CDC table.
        try:
            session.execute(empty_cdc_table_info.insert_stmt)
            raise Exception("WriteFailure expected")
        except WriteFailure:
            pass
    
        # Now we test for behaviors of non-CDC tables when we've exceeded
        # cdc_total_space_in_mb.
        #
        # First, we drain and save the names of all the new discarded CDC
        # segments
        node.drain()
        session.cluster.shutdown()
        node.stop()
        node.start(wait_for_binary_proto=True)
        session = self.patient_cql_connection(node)
        pre_non_cdc_write_cdc_raw_segments = _get_cdc_raw_files(node.get_path())
    
        # Snapshot the _cdc.idx file if > 4.0 for comparison at end
        before_cdc_state = []  # init empty here to quiet PEP
        if self.cluster.version() >= '4.0':
            # Create ReplayData objects for each index file found in loading 
cluster
            node1_path = os.path.join(node.get_path(), 'cdc_raw')
            before_cdc_state = [ReplayData.load(node1_path, name)
                                for name in os.listdir(node1_path) if 
name.endswith('_cdc.idx')]
    
        # save the names of all the commitlog segments written up to this
        # point:
        pre_non_cdc_write_segments = _get_commitlog_files(node.get_path())
    
        # Check that writing to non-CDC tables succeeds even when writes to CDC
        # tables are rejected:
        non_cdc_prepared_insert = 
session.prepare(non_cdc_table_info.insert_stmt)
        session.execute(non_cdc_prepared_insert, ())  # should not raise an 
exception
    
        # Check the following property: any new commitlog segments written to
        # after cdc_raw has reached its maximum configured size should not be
        # moved to cdc_raw, on commitlog discard, because any such commitlog
        # segments are written to non-CDC tables.
        #
        # First, write to non-cdc tables.
        start, time_limit = time.time(), 600
        rate_limited_debug = get_rate_limited_function(logger.debug, 5)
        logger.debug('writing to non-cdc table')
        # We write until we get a new commitlog segment.
        while _get_commitlog_files(node.get_path()) <= 
pre_non_cdc_write_segments:
            elapsed = time.time() - start
            rate_limited_debug('  non-cdc load step has lasted 
{s:.2f}s'.format(s=elapsed))
            assert elapsed <= time_limit, "It's been over a {s}s and we haven't 
written a new " \
                                           "commitlog segment. Something is 
wrong.".format(s=time_limit)
            execute_concurrent(
                session,
                ((non_cdc_prepared_insert, ()) for _ in range(1000)),
                concurrency=500,
                raise_on_first_error=True,
            )
    
        # Finally, we check that draining doesn't move any new segments to 
cdc_raw:
        node.drain()
        session.cluster.shutdown()
    
        if self.cluster.version() < '4.0':
            assert pre_non_cdc_write_cdc_raw_segments == 
_get_cdc_raw_files(node.get_path())
        else:
            # Create ReplayData objects for each index file found in loading 
cluster
            node2_path = os.path.join(node.get_path(), 'cdc_raw')
            after_cdc_state = [ReplayData.load(node2_path, name)
                               for name in os.listdir(node2_path) if 
name.endswith('_cdc.idx')]
    
            # Confirm all indexes in 1st are accounted for and match 
corresponding entry in 2nd.
            found = True
            for idx in before_cdc_state:
                idx_found = False
                for idx_two in after_cdc_state:
                    if compare_replay_data(idx, idx_two):
                        idx_found = True
                if not idx_found:
                    found = False
                    break
            if not found:
                self._fail_and_print_sets(before_cdc_state, after_cdc_state,
                                          'Found CDC index in before not 
matched in after (non-CDC write test)')
    
            # Now we confirm we don't have anything that showed up in 2nd not 
accounted for in 1st
            orphan_found = False
            for idx_two in after_cdc_state:
                index_found = False
                for idx in before_cdc_state:
                    if compare_replay_data(idx_two, idx):
                        index_found = True
                if not index_found:
                    orphan_found = True
                    break
            if orphan_found:
>               self._fail_and_print_sets(before_cdc_state, after_cdc_state,
                                          'Found orphaned index file in after 
CDC state not in former.')

cdc_test.py:523: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <cdc_test.TestCDC object at 0x7fe8b2e01f10>
rd_one = [ReplayData(idx_name='CommitLog-7-1673574594100_cdc.idx', 
completed='COMPLETED', offset=2097119, 
log_name='CommitLog-7...='CommitLog-7-1673574594101_cdc.idx', 
completed='COMPLETED', offset=2097076, 
log_name='CommitLog-7-1673574594101.log')]
rd_two = [ReplayData(idx_name='CommitLog-7-1673574594100_cdc.idx', 
completed='COMPLETED', offset=2097119, 
log_name='CommitLog-7...me='CommitLog-7-1673574650056_cdc.idx', 
completed='COMPLETED', offset=32230, log_name='CommitLog-7-1673574650056.log')]
msg = 'Found orphaned index file in after CDC state not in former.'

    def _fail_and_print_sets(self, rd_one, rd_two, msg):
        print('Set One:')
        for idx in rd_one:
>           print('   {},{},{},{}'.format(idx.name, idx.completed, idx.offset, 
> idx.log_name))
E           AttributeError: 'ReplayData' object has no attribute 'name'

cdc_test.py:529: AttributeError
{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to