[
https://issues.apache.org/jira/browse/BEAM-12019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17361906#comment-17361906
]
Beam JIRA Bot commented on BEAM-12019:
--------------------------------------
This issue is assigned but has not received an update in 30 days so it has been
labeled "stale-assigned". If you are still working on the issue, please give an
update and remove the label. If you are no longer working on the issue, please
unassign so someone else may work on it. In 7 days the issue will be
automatically unassigned.
> apache_beam.runners.portability.flink_runner_test.FlinkRunnerTestOptimized.test_flink_metrics
> is flaky
> ------------------------------------------------------------------------------------------------------
>
> Key: BEAM-12019
> URL: https://issues.apache.org/jira/browse/BEAM-12019
> Project: Beam
> Issue Type: Bug
> Components: runner-flink, test-failures
> Reporter: Valentyn Tymofieiev
> Assignee: Irwin Alejandro Rodirguez Ramirez
> Priority: P1
> Labels: flake, stale-assigned
>
> Sample error:
> https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Commit/2869/testReport/junit/apache_beam.runners.portability.flink_runner_test/FlinkRunnerTestOptimized/test_flink_metrics/
> {noformat}
> Error Message
> AssertionError: Items in the second set but not the first:
> 'stateful).beam.metric:statecache:hit: 11'
> 'stateful).beam.metric:statecache:put: 1'
> 'stateful).beam.metric:statecache:miss: 1'
> 'stateful).beam.metric:statecache:get_total: 120'
> 'stateful).beam.metric:statecache:size: 10'
> 'stateful).beam.metric:statecache:get: 12'
> 'stateful).beam.metric:statecache:evict: 0'
> 'stateful).beam.metric:statecache:capacity: 123'
> 'stateful).beam.metric:statecache:put_total: 10'
> 'stateful).beam.metric:statecache:evict_total: 0' 'counter: 110'
> 'stateful).beam.metric:statecache:miss_total: 10'
> 'stateful).beam.metric:statecache:hit_total: 110'
> Stacktrace
> self =
> <apache_beam.runners.portability.flink_runner_test.FlinkRunnerTestOptimized
> testMethod=test_flink_metrics>
> def test_flink_metrics(self):
> """Run a simple DoFn that increments a counter and verifies state
> caching metrics. Verifies that its expected value is written to a
> temporary file by the FileReporter"""
>
> counter_name = 'elem_counter'
> state_spec = userstate.BagStateSpec('state', VarIntCoder())
>
> class DoFn(beam.DoFn):
> def __init__(self):
> self.counter = Metrics.counter(self.__class__, counter_name)
> _LOGGER.info('counter: %s' % self.counter.metric_name)
>
> def process(self, kv, state=beam.DoFn.StateParam(state_spec)):
> # Trigger materialization
> list(state.read())
> state.add(1)
> self.counter.inc()
>
> options = self.create_options()
> # Test only supports parallelism of 1
> options._all_options['parallelism'] = 1
> # Create multiple bundles to test cache metrics
> options._all_options['max_bundle_size'] = 10
> options._all_options['max_bundle_time_millis'] = 95130590130
> experiments = options.view_as(DebugOptions).experiments or []
> experiments.append('state_cache_size=123')
> options.view_as(DebugOptions).experiments = experiments
> with Pipeline(self.get_runner(), options) as p:
> # pylint: disable=expression-not-assigned
> (
> p
> | "create" >> beam.Create(list(range(0, 110)))
> | "mapper" >> beam.Map(lambda x: (x % 10, 'val'))
> | "stateful" >> beam.ParDo(DoFn()))
>
> lines_expected = {'counter: 110'}
> if options.view_as(StandardOptions).streaming:
> lines_expected.update([
> # Gauges for the last finished bundle
> 'stateful.beam.metric:statecache:capacity: 123',
> 'stateful.beam.metric:statecache:size: 10',
> 'stateful.beam.metric:statecache:get: 20',
> 'stateful.beam.metric:statecache:miss: 0',
> 'stateful.beam.metric:statecache:hit: 20',
> 'stateful.beam.metric:statecache:put: 0',
> 'stateful.beam.metric:statecache:evict: 0',
> # Counters
> 'stateful.beam.metric:statecache:get_total: 220',
> 'stateful.beam.metric:statecache:miss_total: 10',
> 'stateful.beam.metric:statecache:hit_total: 210',
> 'stateful.beam.metric:statecache:put_total: 10',
> 'stateful.beam.metric:statecache:evict_total: 0',
> ])
> else:
> # Batch has a different processing model. All values for
> # a key are processed at once.
> lines_expected.update([
> # Gauges
> 'stateful).beam.metric:statecache:capacity: 123',
> # For the first key, the cache token will not be set yet.
> # It's lazily initialized after first access in
> StateRequestHandlers
> 'stateful).beam.metric:statecache:size: 10',
> # We have 11 here because there are 110 / 10 elements per key
> 'stateful).beam.metric:statecache:get: 12',
> 'stateful).beam.metric:statecache:miss: 1',
> 'stateful).beam.metric:statecache:hit: 11',
> # State is flushed back once per key
> 'stateful).beam.metric:statecache:put: 1',
> 'stateful).beam.metric:statecache:evict: 0',
> # Counters
> 'stateful).beam.metric:statecache:get_total: 120',
> 'stateful).beam.metric:statecache:miss_total: 10',
> 'stateful).beam.metric:statecache:hit_total: 110',
> 'stateful).beam.metric:statecache:put_total: 10',
> 'stateful).beam.metric:statecache:evict_total: 0',
> ])
> lines_actual = set()
> with open(self.test_metrics_path, 'r') as f:
> for line in f:
> for metric_str in lines_expected:
> metric_name = metric_str.split()[0]
> if metric_str in line:
> lines_actual.add(metric_str)
> elif metric_name in line:
> lines_actual.add(line)
> > self.assertSetEqual(lines_actual, lines_expected)
> E AssertionError: Items in the second set but not the first:
> E 'stateful).beam.metric:statecache:hit: 11'
> E 'stateful).beam.metric:statecache:put: 1'
> E 'stateful).beam.metric:statecache:miss: 1'
> E 'stateful).beam.metric:statecache:get_total: 120'
> E 'stateful).beam.metric:statecache:size: 10'
> E 'stateful).beam.metric:statecache:get: 12'
> E 'stateful).beam.metric:statecache:evict: 0'
> E 'stateful).beam.metric:statecache:capacity: 123'
> E 'stateful).beam.metric:statecache:put_total: 10'
> E 'stateful).beam.metric:statecache:evict_total: 0'
> E 'counter: 110'
> E 'stateful).beam.metric:statecache:miss_total: 10'
> E 'stateful).beam.metric:statecache:hit_total: 110'
> apache_beam/runners/portability/flink_runner_test.py:390: AssertionError
> {noformat}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)