Patrick Madsen created BEAM-10854:
-------------------------------------

             Summary: PeriodicImpulse default arguments are not valid values
                 Key: BEAM-10854
                 URL: https://issues.apache.org/jira/browse/BEAM-10854
             Project: Beam
          Issue Type: Bug
          Components: sdk-py-core
    Affects Versions: 2.23.0
            Reporter: Patrick Madsen


Based on the examples from [Slowly updating side input using 
windowing|https://beam.apache.org/documentation/patterns/side-inputs/#slowly-updating-side-input-using-windowing]
 I attempted to test the PeriodicImpulse using no variables, such that it 
triggered immediately and ran forever.

The below code shows how:

{code:python}
        def pair_account_ids(
            api_key: str, account_ids: Dict[str, str]
        ) -> Optional[Tuple[str, str, int]]:
            if api_key not in account_ids:
                return None

            return (api_key, account_ids[api_key], int(time.time()))

        def echo(elm) -> Dict[str, str]:
            print(elm)
            return elm

        def api_keys(elm) -> Dict[str, str]:
            return {"<api_key_1>": "<account_id_1>", "<api_key_2>": 
"<account_id_2>"}

        pipeline_options = PipelineOptions(streaming=True)
        with TestPipeline(
            options=pipeline_options, runner=beam.runners.DirectRunner()
        ) as p:
            side_input = (
                p
                | "PeriodicImpulse"
                >> PeriodicImpulse(
                    # start_timestamp=start,
                    # stop_timestamp=stop,
                    fire_interval=5,
                    apply_windowing=True,
                )
                | "api_keys" >> beam.Map(api_keys)
            )

            main_input = (
                p
                | "MpImpulse"
                >> beam.Create(["<api_key_1>", "<api_key_2>", 
"<unknown_api_key>"])
                | "MapMpToTimestamped"
                >> beam.Map(lambda src: TimestampedValue(src, time.time()))
                | "WindowMpInto" >> beam.WindowInto(beam.window.FixedWindows(5))
            )

            result = (
                main_input
                | "Pair with AccountIDs"
                >> beam.Map(
                    pair_account_ids, 
account_ids=beam.pvalue.AsSingleton(side_input)
                )
                | "filter" >> beam.Filter(lambda x: x is not None)
                | "echo 2" >> beam.Map(lambda x: print(f"{int(time.time())}: 
{x}"))
            )
        print(f"done:  {int(time.time())}")
{code}

The above pipeline throws the following exception however:


{code}
Traceback (most recent call last):
  File "/test/not_test.py", line 141, in test_side_input
    | "echo 2" >> beam.Map(lambda x: print(f"{int(time.time())}: {x}"))
  File "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line 
555, in __exit__
    self.run().wait_until_finish()
  File 
"/usr/local/lib/python3.7/site-packages/apache_beam/testing/test_pipeline.py", 
line 112, in run
    False if self.not_use_test_runner_api else test_runner_api))
  File "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line 
521, in run
    allow_proto_holders=True).run(False)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line 
534, in run
    return self.runner.run_pipeline(self, self._options)
  File 
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py",
 line 119, in run_pipeline
    return runner.run_pipeline(pipeline, options)
  File 
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 173, in run_pipeline
    pipeline.to_runner_api(default_environment=self._default_environment))
  File 
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 183, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File 
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 340, in run_stages
    bundle_context_manager,
  File 
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 519, in _run_stage
    bundle_manager)
  File 
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 557, in _run_bundle
    data_input, data_output, input_timers, expected_timer_output)
  File 
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 941, in process_bundle
    timer_inputs)):
  File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 598, in 
result_iterator
    yield fs.pop().result()
  File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 435, in 
result
    return self.__get_result()
  File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 384, in 
__get_result
    raise self._exception
  File 
"/usr/local/lib/python3.7/site-packages/apache_beam/utils/thread_pool_executor.py",
 line 44, in run
    self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
  File 
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 937, in execute
    dry_run)
  File 
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 837, in process_bundle
    result_future = self._worker_handler.control_conn.push(process_bundle_req)
  File 
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py",
 line 352, in push
    response = self.worker.do_instruction(request)
  File 
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 480, in do_instruction
    getattr(request, request_type), request.instruction_id)
  File 
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 515, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File 
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 978, in process_bundle
    element.data)
  File 
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 218, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 330, in 
apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 332, in 
apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 195, in 
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 755, in 
apache_beam.runners.worker.operations.SdfProcessSizedElements.process
  File "apache_beam/runners/worker/operations.py", line 764, in 
apache_beam.runners.worker.operations.SdfProcessSizedElements.process
  File "apache_beam/runners/common.py", line 971, in 
apache_beam.runners.common.DoFnRunner.process_with_sized_restriction
  File "apache_beam/runners/common.py", line 711, in 
apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 807, in 
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "apache_beam/runners/common.py", line 1095, in 
apache_beam.runners.common._OutputProcessor.process_outputs
  File 
"/usr/local/lib/python3.7/site-packages/apache_beam/transforms/periodicsequence.py",
 line 124, in process
    timestamp.Timestamp(current_output_timestamp))
  File "/usr/local/lib/python3.7/site-packages/apache_beam/utils/timestamp.py", 
line 64, in __init__
    'Cannot interpret %s %s as seconds.' % (seconds, type(seconds)))
TypeError: Cannot interpret Timestamp(1599216802.136201) <class 
'apache_beam.utils.timestamp.Timestamp'> as seconds.
{code}





--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to