[ 
https://issues.apache.org/jira/browse/BEAM-11883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Valentyn Tymofieiev updated BEAM-11883:
---------------------------------------
    Fix Version/s: 2.30.0
       Resolution: Fixed
           Status: Resolved  (was: Open)

> Unsupported operand type with PeriodicImpulse in Python
> -------------------------------------------------------
>
>                 Key: BEAM-11883
>                 URL: https://issues.apache.org/jira/browse/BEAM-11883
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>    Affects Versions: 2.28.0
>            Reporter: Minbo Bae
>            Priority: P3
>             Fix For: 2.30.0
>
>
> {{PeriodicImpluse}} throws {{TypeError}} if {{Timetamp}} is used as 
> {{start_timestamp}} and {{stop_timestamp parameters}}.
>  
> With the following example, 
>  
> {code:java}
> import logging
> from apache_beam import ParDo
> from apache_beam import Pipeline
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.transforms.periodicsequence import PeriodicImpulse
> def run(argv=None):
>   options = PipelineOptions(argv)
>   with Pipeline(options=options) as p:
>     (p
>      | PeriodicImpulse()  # By default, 
>                           # start_timestamp=Timestamp.now(),
>                           # stop_timestamp=MAX_TIMESTAMP,
>                           # fire_interval=360.0,
>      | ParDo(lambda x: logging.info('element: %s', x))
>      )
> if __name__ == '__main__':
>   run()
> {code}
>  
> Running with DirectRunner fails with the following stacktrace.  
> {noformat}
> Traceback (most recent call last):
>   File "apache_beam/runners/common.py", line 1239, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 587, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 1374, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File 
> "/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 1426, in process
>     element)
>   File 
> "/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/transforms/periodicsequence.py",
>  line 42, in initial_restriction
>     total_outputs = math.ceil((end - start) / interval)
> TypeError: unsupported operand type(s) for /: 'Duration' and 'float'During 
> handling of the above exception, another exception occurred:Traceback (most 
> recent call last):
>   File 
> "/Users/baeminbo/Documents/workspace/dataflow-python/periodic_impulse_pipeline.py",
>  line 22, in <module>
>     run()
>   File 
> "/Users/baeminbo/Documents/workspace/dataflow-python/periodic_impulse_pipeline.py",
>  line 18, in run
>     | ParDo(lambda x: logging.info('element: %s', x))
>   File 
> "/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/pipeline.py",
>  line 580, in __exit__
>     self.result = self.run()
>   File 
> "/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/pipeline.py",
>  line 559, in run
>     return self.runner.run_pipeline(self, self._options)
>   File 
> "/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/runners/direct/direct_runner.py",
>  line 133, in run_pipeline
>     return runner.run_pipeline(pipeline, options)
>   File 
> "/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>  line 183, in run_pipeline
>     pipeline.to_runner_api(default_environment=self._default_environment))
>   File 
> "/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>  line 193, in run_via_runner_api
>     return self.run_stages(stage_context, stages)
>   File 
> "/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>  line 359, in run_stages
>     bundle_context_manager,
>   File 
> "/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>  line 555, in _run_stage
>     bundle_manager)
>   File 
> "/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>  line 595, in _run_bundle
>     data_input, data_output, input_timers, expected_timer_output)
>   File 
> "/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>  line 896, in process_bundle
>     result_future = self._worker_handler.control_conn.push(process_bundle_req)
>   File 
> "/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py",
>  line 380, in push
>     response = self.worker.do_instruction(request)
>   File 
> "/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 607, in do_instruction
>     getattr(request, request_type), request.instruction_id)
>   File 
> "/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 644, in process_bundle
>     bundle_processor.process_bundle(instruction_id))
>   File 
> "/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 1000, in process_bundle
>     element.data)
>   File 
> "/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 228, in process_encoded
>     self.output(decoded_value)
>   File "apache_beam/runners/worker/operations.py", line 357, in 
> apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 359, in 
> apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 221, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 718, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/worker/operations.py", line 719, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/common.py", line 1241, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 1306, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "apache_beam/runners/common.py", line 1239, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 587, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 1401, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/worker/operations.py", line 221, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 718, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/worker/operations.py", line 719, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/common.py", line 1241, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 1306, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "apache_beam/runners/common.py", line 1239, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 587, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 1401, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/worker/operations.py", line 221, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 718, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/worker/operations.py", line 719, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/common.py", line 1241, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 1321, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File 
> "/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/future/utils/__init__.py",
>  line 446, in raise_with_traceback
>     raise exc.with_traceback(traceback)
>   File "apache_beam/runners/common.py", line 1239, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 587, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 1374, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File 
> "/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 1426, in process
>     element)
>   File 
> "/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/transforms/periodicsequence.py",
>  line 42, in initial_restriction
>     total_outputs = math.ceil((end - start) / interval)
> TypeError: unsupported operand type(s) for /: 'Duration' and 'float' [while 
> running 'PeriodicImpulse/GenSequence/PairWithRestriction']Process finished 
> with exit code 1
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to