[ 
https://issues.apache.org/jira/browse/BEAM-5132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583322#comment-16583322
 ] 

Subhash commented on BEAM-5132:
-------------------------------

Hey [~thangbui] and [~altay]! After some breakpoint debugging, I got a clue for 
why this error is happening.

(The Apache Beam version I'm using is 2.5.0).

 

So, in line 134 of fn_api_runner.py, there's a call to the 
'create_trigger_driver' method in the trigger.py file.

Here's an excerpt:

 
{code:java}
def __iter__(self):
  output_stream = create_OutputStream()
  
  if self._windowing.is_default():
    globally_window = GlobalWindows.windowed_value(None).with_value
    windowed_key_values = lambda key, values: [globally_window((key, values))]
  else:
    
    # *********** THIS LINE BELOW!!! ***************
    trigger_driver = trigger.create_trigger_driver(self._windowing, True)
    
    windowed_key_values = trigger_driver.process_entire_key
  
  coder_impl = self._post_grouped_coder.get_impl()
  key_coder_impl = self._key_coder.get_impl()
  for encoded_key, windowed_values in self._table.items():
    key = key_coder_impl.decode(encoded_key)
    for wkvs in windowed_key_values(key, windowed_values):
      coder_impl.encode_to_stream(wkvs, output_stream, True)
  return iter([output_stream.get()])
{code}
However, if you like at the method signature for the create_trigger_driver 
method in trigger.py, it's like this:
{code:java}
def create_trigger_driver(windowing,
                          is_batch=False, phased_combine_fn=None, clock=None):
  """Create the TriggerDriver for the given windowing and options."""

  # TODO(robertwb): We can do more if we know elements are in timestamp
  # sorted order.
  if windowing.is_default() and is_batch:
    driver = DiscardingGlobalTriggerDriver()
  elif (windowing.windowfn == GlobalWindows()
        and windowing.triggerfn == AfterCount(1)
        and windowing.accumulation_mode == AccumulationMode.DISCARDING):
    # Here we also just pass through all the values every time.
    driver = DiscardingGlobalTriggerDriver()
  else:
    driver = GeneralTriggerDriver(windowing, clock)
{code}
Notice how in the first excerpt, the call to create_trigger_driver only species 
the windowing and is_batch parameters, which causes the create_trigger_driver 
method to use the default value for clock (since this param wasn't given by the 
callee), which is None.

 

This then ripples down and eventually causes the error in TriggerContext saying 
that its clock value is None. After all, a "non-None" clock value was never 
given!

 

So, how can we fix this? What should be passed in for the clock parameter in 
the fn_api_runner.py module?

Once I know this, I would be happy to fix it and submit a PR.

 

Thanks! :)

> Composite windowing fail with exception: AttributeError: 'NoneType' object 
> has no attribute 'time'
> --------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-5132
>                 URL: https://issues.apache.org/jira/browse/BEAM-5132
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>    Affects Versions: 2.5.0
>         Environment: Windows machine
>            Reporter: Bui Nguyen Thang
>            Priority: Major
>
> Tried to apply a window function to the existing pipeline that was running 
> fine. Got the following error:
> {code:java}
> Traceback (most recent call last):
>   File "E:\soft\ide\PyCharm 2018.1.2\helpers\pydev\pydevd.py", line 1664, in 
> <module>
>     main()
>   File "E:\soft\ide\PyCharm 2018.1.2\helpers\pydev\pydevd.py", line 1658, in 
> main
>     globals = debugger.run(setup['file'], None, None, is_module)
>   File "E:\soft\ide\PyCharm 2018.1.2\helpers\pydev\pydevd.py", line 1068, in 
> run
>     pydev_imports.execfile(file, globals, locals)  # execute the script
>   File 
> "E:/work/source/ai-data-pipeline-research/metric_pipeline/batch_beam/batch_pipeline_main.py",
>  line 97, in <module>
>     result.wait_until_finish()
>   File 
> "C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\runners\direct\direct_runner.py",
>  line 421, in wait_until_finish
>     self._executor.await_completion()
>   File 
> "C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\runners\direct\executor.py",
>  line 398, in await_completion
>     self._executor.await_completion()
>   File 
> "C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\runners\direct\executor.py",
>  line 444, in await_completion
>     six.reraise(t, v, tb)
>   File 
> "C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\runners\direct\executor.py",
>  line 341, in call
>     finish_state)
>   File 
> "C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\runners\direct\executor.py",
>  line 378, in attempt_call
>     evaluator.process_element(value)
>   File 
> "C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\runners\direct\transform_evaluator.py",
>  line 574, in process_element
>     self.runner.process(element)
>   File 
> "C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\runners\common.py",
>  line 577, in process
>     self._reraise_augmented(exn)
>   File 
> "C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\runners\common.py",
>  line 618, in _reraise_augmented
>     six.reraise(type(new_exn), new_exn, original_traceback)
>   File 
> "C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\runners\common.py",
>  line 575, in process
>     self.do_fn_invoker.invoke_process(windowed_value)
>   File 
> "C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\runners\common.py",
>  line 353, in invoke_process
>     windowed_value, self.process_method(windowed_value.value))
>   File 
> "C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\runners\common.py",
>  line 651, in process_outputs
>     for result in results:
>   File 
> "C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\transforms\trigger.py",
>  line 942, in process_entire_key
>     state, windowed_values, output_watermark):
>   File 
> "C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\transforms\trigger.py",
>  line 1098, in process_elements
>     self.trigger_fn.on_element(value, window, context)
>   File 
> "C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\transforms\trigger.py",
>  line 488, in on_element
>     self.underlying.on_element(element, window, context)
>   File 
> "C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\transforms\trigger.py",
>  line 535, in on_element
>     trigger.on_element(element, window, self._sub_context(context, ix))
>   File 
> "C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\transforms\trigger.py",
>  line 286, in on_element
>     '', TimeDomain.REAL_TIME, context.get_current_time() + self.delay)
>   File 
> "C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\transforms\trigger.py",
>  line 728, in get_current_time
>     return self._outer.get_current_time()
>   File 
> "C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\transforms\trigger.py",
>  line 702, in get_current_time
>     return self._clock.time()
> AttributeError: 'NoneType' object has no attribute 'time' [while running 
> 'combine all sharpe_ratio to list/CombinePerKey/GroupByKey/GroupByWindow']
> {code}
> the composite window function is copied from official documents:
> https://beam.apache.org/documentation/programming-guide/#composite-triggers
> Please refer to pipeline relevant source code below:
> {code:python}
> windowing = beam.WindowInto(FixedWindows(1 * 60),
>                             trigger=Repeatedly(AfterAny(AfterCount(100), 
> AfterProcessingTime(delay=1 * 60))),
>                             accumulation_mode=AccumulationMode.DISCARDING)
> valuesPCollection \
> | 'calculate sharpe_ratio' >> beam.FlatMap(fn_calculate_sharpe_ratio) \
> | 'window sharpe_ratio' >> windowing \
> | 'combine all sharpe_ratio to list' >> 
> beam.CombineGlobally(CombineAllToListFn()).without_defaults() \
> | 'store sharpe_ratio' >> beam.FlatMap(store_metric_with_now_ts)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to