[ 
https://issues.apache.org/jira/browse/BEAM-14481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17542273#comment-17542273
 ] 

Brian Hulette commented on BEAM-14481:
--------------------------------------

I managed to identify the root cause here. Re-entering the "fast" 
implementation of a ScopedState (this is what I did by nesting {{with 
self.scoped_start_state:}} blocks) puts it into a bad state.

The "slow" ScopedState maintains a stack of previous states, so it actually 
shouldn't repro this issue. But the "fast" ScopedState only tracks a single 
previous state index, so when you re-enter the previous state is overwritten. 
My proposed fix: add an assertion to prevent re-entering the same ScopedState 
instance (for both fast and slow), if we had this before we would have caught 
the issue in unit tests.

I was able to repro the issue by adding a test like this to statesampler_test:

{code:python}
  def test_reenter_scoped_state(self):                                         
    # Set up state sampler.                                                     
    counter_factory = CounterFactory()                                          
    sampler = statesampler.StateSampler(                                        
       'basic', counter_factory, sampling_period_ms=1)                         
                                                                                
    statea = sampler.scoped_state('step1', 'statea'):                           
    stateb = sampler.scoped_state('step1', 'stateb'):                           
                                                                                
                  
    sampler.start()                                                             
    with statea:                                                                
      self.assertEqual(                                                         
         sampler.current_state().name,                                         
         CounterName('statea-msecs', step_name='step1', stage_name='basic'))   
      with stateb:                                                              
        self.assertEqual(                                                       
            sampler.current_state().name,                                       
            CounterName('stateb-msecs', step_name='step1', stage_name='basic')) 
        with stateb:                                                            
          self.assertEqual(                                                     
              sampler.current_state().name,                                     
              CounterName('stateb-msecs', step_name='step1', 
stage_name='basic'))
        self.assertEqual(                                                       
            sampler.current_state().name,                                       
            CounterName('stateb-msecs', step_name='step1', stage_name='basic'))
      # !!!! This assertion fails !!!                                           
                                                                                
                        
      self.assertEqual(                                                         
          sampler.current_state().name,                                         
          CounterName('statea-msecs', step_name='step1', stage_name='basic'))   
{code}

It turns out there are actually several other places where we're re-entering a 
ScopedState instance. I suspect the one I added just caused a breakage because 
it was happening in {{DataInputOperation}}, but it seems likely that the other 
places are leading to obfuscated metrics.

> OOM regression caused by Batched DoFn worker changes
> ----------------------------------------------------
>
>                 Key: BEAM-14481
>                 URL: https://issues.apache.org/jira/browse/BEAM-14481
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Brian Hulette
>            Assignee: Brian Hulette
>            Priority: P1
>          Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> We've observed some OOMs in internal pipelines. We've narrowed the root cause 
> down to [https://github.com/apache/beam/pull/17384] - the worker changes for 
> Batched DoFns.
> I still don't understand the root cause, but given this change can lead to a 
> regression in pipelines that don't use the feature I think we should roll it 
> back.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to