kennknowles opened a new issue, #19070:
URL: https://github.com/apache/beam/issues/19070

   Tried to apply a window function to the existing pipeline that was running 
fine. Got the following error:
   ```
   
   Traceback (most recent call last):
     File "E:\soft\ide\PyCharm 2018.1.2\helpers\pydev\pydevd.py",
   line 1664, in <module>
       main()
     File "E:\soft\ide\PyCharm 2018.1.2\helpers\pydev\pydevd.py", line
   1658, in main
       globals = debugger.run(setup['file'], None, None, is_module)
     File "E:\soft\ide\PyCharm
   2018.1.2\helpers\pydev\pydevd.py", line 1068, in run
       pydev_imports.execfile(file, globals, locals)
    # execute the script
     File 
"E:/work/source/ai-data-pipeline-research/metric_pipeline/batch_beam/batch_pipeline_main.py",
   line 97, in <module>
       result.wait_until_finish()
     File 
"C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\runners\direct\direct_runner.py",
   line 421, in wait_until_finish
       self._executor.await_completion()
     File 
"C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\runners\direct\executor.py",
   line 398, in await_completion
       self._executor.await_completion()
     File 
"C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\runners\direct\executor.py",
   line 444, in await_completion
       six.reraise(t, v, tb)
     File 
"C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\runners\direct\executor.py",
   line 341, in call
       finish_state)
     File 
"C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\runners\direct\executor.py",
   line 378, in attempt_call
       evaluator.process_element(value)
     File 
"C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\runners\direct\transform_evaluator.py",
   line 574, in process_element
       self.runner.process(element)
     File 
"C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\runners\common.py",
   line 577, in process
       self._reraise_augmented(exn)
     File 
"C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\runners\common.py",
   line 618, in _reraise_augmented
       six.reraise(type(new_exn), new_exn, original_traceback)
     File
   
"C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\runners\common.py",
   line 575, in process
       self.do_fn_invoker.invoke_process(windowed_value)
     File 
"C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\runners\common.py",
   line 353, in invoke_process
       windowed_value, self.process_method(windowed_value.value))
     File
   
"C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\runners\common.py",
   line 651, in process_outputs
       for result in results:
     File 
"C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\transforms\trigger.py",
   line 942, in process_entire_key
       state, windowed_values, output_watermark):
     File 
"C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\transforms\trigger.py",
   line 1098, in process_elements
       self.trigger_fn.on_element(value, window, context)
     File 
"C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\transforms\trigger.py",
   line 488, in on_element
       self.underlying.on_element(element, window, context)
     File 
"C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\transforms\trigger.py",
   line 535, in on_element
       trigger.on_element(element, window, self._sub_context(context, ix))
    
   File 
"C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\transforms\trigger.py",
   line 286, in on_element
       '', TimeDomain.REAL_TIME, context.get_current_time() + self.delay)
     File
   
"C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\transforms\trigger.py",
   line 728, in get_current_time
       return self._outer.get_current_time()
     File 
"C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\transforms\trigger.py",
   line 702, in get_current_time
       return self._clock.time()
   AttributeError: 'NoneType' object has
   no attribute 'time' [while running 'combine all sharpe_ratio to 
list/CombinePerKey/GroupByKey/GroupByWindow']
   
   ```
   
   
   the composite window function is copied from official documents:
   https://beam.apache.org/documentation/programming-guide/#composite-triggers
   Please refer to pipeline relevant source code below:
   ```
   
   windowing = beam.WindowInto(FixedWindows(1 * 60),
                               trigger=Repeatedly(AfterAny(AfterCount(100),
   AfterProcessingTime(delay=1 * 60))),
                               accumulation_mode=AccumulationMode.DISCARDING)
   valuesPCollection
   \
   | 'calculate sharpe_ratio' >> beam.FlatMap(fn_calculate_sharpe_ratio) \
   | 'window sharpe_ratio'
   >> windowing \
   | 'combine all sharpe_ratio to list' >> 
beam.CombineGlobally(CombineAllToListFn()).without_defaults()
   \
   | 'store sharpe_ratio' >> beam.FlatMap(store_metric_with_now_ts)
   
   ```
   
   
   
   
   Imported from Jira 
[BEAM-5132](https://issues.apache.org/jira/browse/BEAM-5132). Original Jira may 
contain additional context.
   Reported by: thangbui.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to