damccorm opened a new issue, #20976:
URL: https://github.com/apache/beam/issues/20976

   Sample error:
   
https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Commit/2869/testReport/junit/apache_beam.runners.portability.flink_runner_test/FlinkRunnerTestOptimized/test_flink_metrics/
   
   ```
   
   Error Message
   AssertionError: Items in the second set but not the first: 
'stateful).beam.metric:statecache:hit:
   11' 'stateful).beam.metric:statecache:put: 1' 
'stateful).beam.metric:statecache:miss: 1' 
'stateful).beam.metric:statecache:get_total:
   120' 'stateful).beam.metric:statecache:size: 10' 
'stateful).beam.metric:statecache:get: 12' 
'stateful).beam.metric:statecache:evict:
   0' 'stateful).beam.metric:statecache:capacity: 123' 
'stateful).beam.metric:statecache:put_total: 10'
   'stateful).beam.metric:statecache:evict_total: 0' 'counter: 110' 
'stateful).beam.metric:statecache:miss_total:
   10' 'stateful).beam.metric:statecache:hit_total: 110'
   Stacktrace
   self = 
<apache_beam.runners.portability.flink_runner_test.FlinkRunnerTestOptimized
   testMethod=test_flink_metrics>
   
       def test_flink_metrics(self):
         """Run a simple DoFn that
   increments a counter and verifies state
         caching metrics. Verifies that its expected value is
   written to a
         temporary file by the FileReporter"""
       
         counter_name = 'elem_counter'
   
        state_spec = userstate.BagStateSpec('state', VarIntCoder())
       
         class DoFn(beam.DoFn):
   
          def __init__(self):
             self.counter = Metrics.counter(self.__class__, counter_name)
   
            _LOGGER.info('counter: %s' % self.counter.metric_name)
       
           def process(self, kv,
   state=beam.DoFn.StateParam(state_spec)):
             # Trigger materialization
             list(state.read())
   
            state.add(1)
             self.counter.inc()
       
         options = self.create_options()
    
       # Test only supports parallelism of 1
         options._all_options['parallelism'] = 1
         # Create
   multiple bundles to test cache metrics
         options._all_options['max_bundle_size'] = 10
         options._all_options['max_bundle_time_millis']
   = 95130590130
         experiments = options.view_as(DebugOptions).experiments or []
         experiments.append('state_cache_size=123')
   
        options.view_as(DebugOptions).experiments = experiments
         with Pipeline(self.get_runner(),
   options) as p:
           # pylint: disable=expression-not-assigned
           (
               p
         
        | "create" >> beam.Create(list(range(0, 110)))
               | "mapper" >> beam.Map(lambda x: (x
   % 10, 'val'))
               | "stateful" >> beam.ParDo(DoFn()))
       
         lines_expected = {'counter:
   110'}
         if options.view_as(StandardOptions).streaming:
           lines_expected.update([
        
         # Gauges for the last finished bundle
               'stateful.beam.metric:statecache:capacity:
   123',
               'stateful.beam.metric:statecache:size: 10',
               'stateful.beam.metric:statecache:get:
   20',
               'stateful.beam.metric:statecache:miss: 0',
               'stateful.beam.metric:statecache:hit:
   20',
               'stateful.beam.metric:statecache:put: 0',
               'stateful.beam.metric:statecache:evict:
   0',
               # Counters
               'stateful.beam.metric:statecache:get_total: 220',
          
       'stateful.beam.metric:statecache:miss_total: 10',
               'stateful.beam.metric:statecache:hit_total:
   210',
               'stateful.beam.metric:statecache:put_total: 10',
               'stateful.beam.metric:statecache:evict_total:
   0',
           ])
         else:
           # Batch has a different processing model. All values for
      
       # a key are processed at once.
           lines_expected.update([
               # Gauges
           
      'stateful).beam.metric:statecache:capacity: 123',
               # For the first key, the cache token
   will not be set yet.
               # It's lazily initialized after first access in 
StateRequestHandlers
   
              'stateful).beam.metric:statecache:size: 10',
               # We have 11 here because there
   are 110 / 10 elements per key
               'stateful).beam.metric:statecache:get: 12',
              
   'stateful).beam.metric:statecache:miss: 1',
               'stateful).beam.metric:statecache:hit: 11',
   
              # State is flushed back once per key
               'stateful).beam.metric:statecache:put:
   1',
               'stateful).beam.metric:statecache:evict: 0',
               # Counters
               'stateful).beam.metric:statecache:get_total:
   120',
               'stateful).beam.metric:statecache:miss_total: 10',
               'stateful).beam.metric:statecache:hit_total:
   110',
               'stateful).beam.metric:statecache:put_total: 10',
               'stateful).beam.metric:statecache:evict_total:
   0',
           ])
         lines_actual = set()
         with open(self.test_metrics_path, 'r') as f:
     
        for line in f:
             for metric_str in lines_expected:
               metric_name = metric_str.split()[0]
   
              if metric_str in line:
                 lines_actual.add(metric_str)
               elif metric_name
   in line:
                 lines_actual.add(line)
   >     self.assertSetEqual(lines_actual, lines_expected)
   E
       AssertionError: Items in the second set but not the first:
   E     'stateful).beam.metric:statecache:hit:
   11'
   E     'stateful).beam.metric:statecache:put: 1'
   E     'stateful).beam.metric:statecache:miss:
   1'
   E     'stateful).beam.metric:statecache:get_total: 120'
   E     'stateful).beam.metric:statecache:size:
   10'
   E     'stateful).beam.metric:statecache:get: 12'
   E     'stateful).beam.metric:statecache:evict:
   0'
   E     'stateful).beam.metric:statecache:capacity: 123'
   E     'stateful).beam.metric:statecache:put_total:
   10'
   E     'stateful).beam.metric:statecache:evict_total: 0'
   E     'counter: 110'
   E     'stateful).beam.metric:statecache:miss_total:
   10'
   E     'stateful).beam.metric:statecache:hit_total: 110'
   
   apache_beam/runners/portability/flink_runner_test.py:390:
   AssertionError
   
   
   ```
   
   
   Imported from Jira 
[BEAM-12019](https://issues.apache.org/jira/browse/BEAM-12019). Original Jira 
may contain additional context.
   Reported by: tvalentyn.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to