kennknowles opened a new issue, #19070:
URL: https://github.com/apache/beam/issues/19070
Tried to apply a window function to the existing pipeline that was running
fine. Got the following error:
```
Traceback (most recent call last):
File "E:\soft\ide\PyCharm 2018.1.2\helpers\pydev\pydevd.py",
line 1664, in <module>
main()
File "E:\soft\ide\PyCharm 2018.1.2\helpers\pydev\pydevd.py", line
1658, in main
globals = debugger.run(setup['file'], None, None, is_module)
File "E:\soft\ide\PyCharm
2018.1.2\helpers\pydev\pydevd.py", line 1068, in run
pydev_imports.execfile(file, globals, locals)
# execute the script
File
"E:/work/source/ai-data-pipeline-research/metric_pipeline/batch_beam/batch_pipeline_main.py",
line 97, in <module>
result.wait_until_finish()
File
"C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\runners\direct\direct_runner.py",
line 421, in wait_until_finish
self._executor.await_completion()
File
"C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\runners\direct\executor.py",
line 398, in await_completion
self._executor.await_completion()
File
"C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\runners\direct\executor.py",
line 444, in await_completion
six.reraise(t, v, tb)
File
"C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\runners\direct\executor.py",
line 341, in call
finish_state)
File
"C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\runners\direct\executor.py",
line 378, in attempt_call
evaluator.process_element(value)
File
"C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\runners\direct\transform_evaluator.py",
line 574, in process_element
self.runner.process(element)
File
"C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\runners\common.py",
line 577, in process
self._reraise_augmented(exn)
File
"C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\runners\common.py",
line 618, in _reraise_augmented
six.reraise(type(new_exn), new_exn, original_traceback)
File
"C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\runners\common.py",
line 575, in process
self.do_fn_invoker.invoke_process(windowed_value)
File
"C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\runners\common.py",
line 353, in invoke_process
windowed_value, self.process_method(windowed_value.value))
File
"C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\runners\common.py",
line 651, in process_outputs
for result in results:
File
"C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\transforms\trigger.py",
line 942, in process_entire_key
state, windowed_values, output_watermark):
File
"C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\transforms\trigger.py",
line 1098, in process_elements
self.trigger_fn.on_element(value, window, context)
File
"C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\transforms\trigger.py",
line 488, in on_element
self.underlying.on_element(element, window, context)
File
"C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\transforms\trigger.py",
line 535, in on_element
trigger.on_element(element, window, self._sub_context(context, ix))
File
"C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\transforms\trigger.py",
line 286, in on_element
'', TimeDomain.REAL_TIME, context.get_current_time() + self.delay)
File
"C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\transforms\trigger.py",
line 728, in get_current_time
return self._outer.get_current_time()
File
"C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\transforms\trigger.py",
line 702, in get_current_time
return self._clock.time()
AttributeError: 'NoneType' object has
no attribute 'time' [while running 'combine all sharpe_ratio to
list/CombinePerKey/GroupByKey/GroupByWindow']
```
the composite window function is copied from official documents:
https://beam.apache.org/documentation/programming-guide/#composite-triggers
Please refer to pipeline relevant source code below:
```
windowing = beam.WindowInto(FixedWindows(1 * 60),
trigger=Repeatedly(AfterAny(AfterCount(100),
AfterProcessingTime(delay=1 * 60))),
accumulation_mode=AccumulationMode.DISCARDING)
valuesPCollection
\
| 'calculate sharpe_ratio' >> beam.FlatMap(fn_calculate_sharpe_ratio) \
| 'window sharpe_ratio'
>> windowing \
| 'combine all sharpe_ratio to list' >>
beam.CombineGlobally(CombineAllToListFn()).without_defaults()
\
| 'store sharpe_ratio' >> beam.FlatMap(store_metric_with_now_ts)
```
Imported from Jira
[BEAM-5132](https://issues.apache.org/jira/browse/BEAM-5132). Original Jira may
contain additional context.
Reported by: thangbui.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]