damccorm opened a new issue, #20976: URL: https://github.com/apache/beam/issues/20976
Sample error: https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Commit/2869/testReport/junit/apache_beam.runners.portability.flink_runner_test/FlinkRunnerTestOptimized/test_flink_metrics/ ``` Error Message AssertionError: Items in the second set but not the first: 'stateful).beam.metric:statecache:hit: 11' 'stateful).beam.metric:statecache:put: 1' 'stateful).beam.metric:statecache:miss: 1' 'stateful).beam.metric:statecache:get_total: 120' 'stateful).beam.metric:statecache:size: 10' 'stateful).beam.metric:statecache:get: 12' 'stateful).beam.metric:statecache:evict: 0' 'stateful).beam.metric:statecache:capacity: 123' 'stateful).beam.metric:statecache:put_total: 10' 'stateful).beam.metric:statecache:evict_total: 0' 'counter: 110' 'stateful).beam.metric:statecache:miss_total: 10' 'stateful).beam.metric:statecache:hit_total: 110' Stacktrace self = <apache_beam.runners.portability.flink_runner_test.FlinkRunnerTestOptimized testMethod=test_flink_metrics> def test_flink_metrics(self): """Run a simple DoFn that increments a counter and verifies state caching metrics. Verifies that its expected value is written to a temporary file by the FileReporter""" counter_name = 'elem_counter' state_spec = userstate.BagStateSpec('state', VarIntCoder()) class DoFn(beam.DoFn): def __init__(self): self.counter = Metrics.counter(self.__class__, counter_name) _LOGGER.info('counter: %s' % self.counter.metric_name) def process(self, kv, state=beam.DoFn.StateParam(state_spec)): # Trigger materialization list(state.read()) state.add(1) self.counter.inc() options = self.create_options() # Test only supports parallelism of 1 options._all_options['parallelism'] = 1 # Create multiple bundles to test cache metrics options._all_options['max_bundle_size'] = 10 options._all_options['max_bundle_time_millis'] = 95130590130 experiments = options.view_as(DebugOptions).experiments or [] experiments.append('state_cache_size=123') options.view_as(DebugOptions).experiments = experiments with Pipeline(self.get_runner(), options) as p: # pylint: disable=expression-not-assigned ( p | "create" >> beam.Create(list(range(0, 110))) | "mapper" >> beam.Map(lambda x: (x % 10, 'val')) | "stateful" >> beam.ParDo(DoFn())) lines_expected = {'counter: 110'} if options.view_as(StandardOptions).streaming: lines_expected.update([ # Gauges for the last finished bundle 'stateful.beam.metric:statecache:capacity: 123', 'stateful.beam.metric:statecache:size: 10', 'stateful.beam.metric:statecache:get: 20', 'stateful.beam.metric:statecache:miss: 0', 'stateful.beam.metric:statecache:hit: 20', 'stateful.beam.metric:statecache:put: 0', 'stateful.beam.metric:statecache:evict: 0', # Counters 'stateful.beam.metric:statecache:get_total: 220', 'stateful.beam.metric:statecache:miss_total: 10', 'stateful.beam.metric:statecache:hit_total: 210', 'stateful.beam.metric:statecache:put_total: 10', 'stateful.beam.metric:statecache:evict_total: 0', ]) else: # Batch has a different processing model. All values for # a key are processed at once. lines_expected.update([ # Gauges 'stateful).beam.metric:statecache:capacity: 123', # For the first key, the cache token will not be set yet. # It's lazily initialized after first access in StateRequestHandlers 'stateful).beam.metric:statecache:size: 10', # We have 11 here because there are 110 / 10 elements per key 'stateful).beam.metric:statecache:get: 12', 'stateful).beam.metric:statecache:miss: 1', 'stateful).beam.metric:statecache:hit: 11', # State is flushed back once per key 'stateful).beam.metric:statecache:put: 1', 'stateful).beam.metric:statecache:evict: 0', # Counters 'stateful).beam.metric:statecache:get_total: 120', 'stateful).beam.metric:statecache:miss_total: 10', 'stateful).beam.metric:statecache:hit_total: 110', 'stateful).beam.metric:statecache:put_total: 10', 'stateful).beam.metric:statecache:evict_total: 0', ]) lines_actual = set() with open(self.test_metrics_path, 'r') as f: for line in f: for metric_str in lines_expected: metric_name = metric_str.split()[0] if metric_str in line: lines_actual.add(metric_str) elif metric_name in line: lines_actual.add(line) > self.assertSetEqual(lines_actual, lines_expected) E AssertionError: Items in the second set but not the first: E 'stateful).beam.metric:statecache:hit: 11' E 'stateful).beam.metric:statecache:put: 1' E 'stateful).beam.metric:statecache:miss: 1' E 'stateful).beam.metric:statecache:get_total: 120' E 'stateful).beam.metric:statecache:size: 10' E 'stateful).beam.metric:statecache:get: 12' E 'stateful).beam.metric:statecache:evict: 0' E 'stateful).beam.metric:statecache:capacity: 123' E 'stateful).beam.metric:statecache:put_total: 10' E 'stateful).beam.metric:statecache:evict_total: 0' E 'counter: 110' E 'stateful).beam.metric:statecache:miss_total: 10' E 'stateful).beam.metric:statecache:hit_total: 110' apache_beam/runners/portability/flink_runner_test.py:390: AssertionError ``` Imported from Jira [BEAM-12019](https://issues.apache.org/jira/browse/BEAM-12019). Original Jira may contain additional context. Reported by: tvalentyn. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
