Minbo Bae created BEAM-11883:
--------------------------------
Summary: Unsupported operand type with PeriodicImpulse
Key: BEAM-11883
URL: https://issues.apache.org/jira/browse/BEAM-11883
Project: Beam
Issue Type: Bug
Components: sdk-py-core
Affects Versions: 2.28.0
Reporter: Minbo Bae
{{PeriodicImpluse}} throws {{TypeError}} if {{Timetamp}} is used as
{{start_timestamp}} and {{stop_timestamp parameters}}.
With the following example,
{code:java}
import logging
from apache_beam import ParDo
from apache_beam import Pipeline
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.periodicsequence import PeriodicImpulse
def run(argv=None):
options = PipelineOptions(argv)
with Pipeline(options=options) as p:
(p
| PeriodicImpulse() # By default,
# start_timestamp=Timestamp.now(),
# stop_timestamp=MAX_TIMESTAMP,
# fire_interval=360.0,
| ParDo(lambda x: logging.info('element: %s', x))
)
if __name__ == '__main__':
run()
{code}
Running with DirectRunner fails with the following stacktrace.
{noformat}
Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1239, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 587, in
apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1374, in
apache_beam.runners.common._OutputProcessor.process_outputs
File
"/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 1426, in process
element)
File
"/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/transforms/periodicsequence.py",
line 42, in initial_restriction
total_outputs = math.ceil((end - start) / interval)
TypeError: unsupported operand type(s) for /: 'Duration' and 'float'During
handling of the above exception, another exception occurred:Traceback (most
recent call last):
File
"/Users/baeminbo/Documents/workspace/dataflow-python/periodic_impulse_pipeline.py",
line 22, in <module>
run()
File
"/Users/baeminbo/Documents/workspace/dataflow-python/periodic_impulse_pipeline.py",
line 18, in run
| ParDo(lambda x: logging.info('element: %s', x))
File
"/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/pipeline.py",
line 580, in __exit__
self.result = self.run()
File
"/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/pipeline.py",
line 559, in run
return self.runner.run_pipeline(self, self._options)
File
"/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/runners/direct/direct_runner.py",
line 133, in run_pipeline
return runner.run_pipeline(pipeline, options)
File
"/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 183, in run_pipeline
pipeline.to_runner_api(default_environment=self._default_environment))
File
"/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 193, in run_via_runner_api
return self.run_stages(stage_context, stages)
File
"/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 359, in run_stages
bundle_context_manager,
File
"/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 555, in _run_stage
bundle_manager)
File
"/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 595, in _run_bundle
data_input, data_output, input_timers, expected_timer_output)
File
"/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 896, in process_bundle
result_future = self._worker_handler.control_conn.push(process_bundle_req)
File
"/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py",
line 380, in push
response = self.worker.do_instruction(request)
File
"/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 607, in do_instruction
getattr(request, request_type), request.instruction_id)
File
"/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 644, in process_bundle
bundle_processor.process_bundle(instruction_id))
File
"/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 1000, in process_bundle
element.data)
File
"/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 228, in process_encoded
self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 357, in
apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 359, in
apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 221, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 718, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 719, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1241, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1306, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1239, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 587, in
apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1401, in
apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 221, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 718, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 719, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1241, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1306, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1239, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 587, in
apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1401, in
apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 221, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 718, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 719, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1241, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1321, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
File
"/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/future/utils/__init__.py",
line 446, in raise_with_traceback
raise exc.with_traceback(traceback)
File "apache_beam/runners/common.py", line 1239, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 587, in
apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1374, in
apache_beam.runners.common._OutputProcessor.process_outputs
File
"/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 1426, in process
element)
File
"/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/transforms/periodicsequence.py",
line 42, in initial_restriction
total_outputs = math.ceil((end - start) / interval)
TypeError: unsupported operand type(s) for /: 'Duration' and 'float' [while
running 'PeriodicImpulse/GenSequence/PairWithRestriction']Process finished with
exit code 1
{noformat}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)