[
https://issues.apache.org/jira/browse/BEAM-10854?focusedWorklogId=577784&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-577784
]
ASF GitHub Bot logged work on BEAM-10854:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 06/Apr/21 17:42
Start Date: 06/Apr/21 17:42
Worklog Time Spent: 10m
Work Description: InigoSJ commented on a change in pull request #14446:
URL: https://github.com/apache/beam/pull/14446#discussion_r608055755
##########
File path: sdks/python/apache_beam/transforms/periodicsequence.py
##########
@@ -37,6 +37,11 @@
class ImpulseSeqGenRestrictionProvider(core.RestrictionProvider):
def initial_restriction(self, element):
start, end, interval = element
+ if isinstance(start, Timestamp):
Review comment:
I guess that would also work, I just didn't want to take the option of
using other types, since some users may already be using them.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 577784)
Time Spent: 40m (was: 0.5h)
> PeriodicImpulse default arguments are not valid values
> ------------------------------------------------------
>
> Key: BEAM-10854
> URL: https://issues.apache.org/jira/browse/BEAM-10854
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Affects Versions: 2.23.0
> Reporter: Patrick Madsen
> Priority: P3
> Labels: Periodic
> Time Spent: 40m
> Remaining Estimate: 0h
>
> Based on the examples from [Slowly updating side input using
> windowing|https://beam.apache.org/documentation/patterns/side-inputs/#slowly-updating-side-input-using-windowing]
> I attempted to test the PeriodicImpulse using no variables, such that it
> triggered immediately and ran forever.
> The below code shows how:
> {code:python}
> def pair_account_ids(
> api_key: str, account_ids: Dict[str, str]
> ) -> Optional[Tuple[str, str, int]]:
> if api_key not in account_ids:
> return None
> return (api_key, account_ids[api_key], int(time.time()))
> def echo(elm) -> Dict[str, str]:
> print(elm)
> return elm
> def api_keys(elm) -> Dict[str, str]:
> return {"<api_key_1>": "<account_id_1>", "<api_key_2>":
> "<account_id_2>"}
> pipeline_options = PipelineOptions(streaming=True)
> with TestPipeline(
> options=pipeline_options, runner=beam.runners.DirectRunner()
> ) as p:
> side_input = (
> p
> | "PeriodicImpulse"
> >> PeriodicImpulse(
> # start_timestamp=start,
> # stop_timestamp=stop,
> fire_interval=5,
> apply_windowing=True,
> )
> | "api_keys" >> beam.Map(api_keys)
> )
> main_input = (
> p
> | "MpImpulse"
> >> beam.Create(["<api_key_1>", "<api_key_2>",
> "<unknown_api_key>"])
> | "MapMpToTimestamped"
> >> beam.Map(lambda src: TimestampedValue(src, time.time()))
> | "WindowMpInto" >>
> beam.WindowInto(beam.window.FixedWindows(5))
> )
> result = (
> main_input
> | "Pair with AccountIDs"
> >> beam.Map(
> pair_account_ids,
> account_ids=beam.pvalue.AsSingleton(side_input)
> )
> | "filter" >> beam.Filter(lambda x: x is not None)
> | "echo 2" >> beam.Map(lambda x: print(f"{int(time.time())}:
> {x}"))
> )
> print(f"done: {int(time.time())}")
> {code}
> The above pipeline throws the following exception however:
> {code}
> Traceback (most recent call last):
> File "/test/not_test.py", line 141, in test_side_input
> | "echo 2" >> beam.Map(lambda x: print(f"{int(time.time())}: {x}"))
> File "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line
> 555, in __exit__
> self.run().wait_until_finish()
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/testing/test_pipeline.py",
> line 112, in run
> False if self.not_use_test_runner_api else test_runner_api))
> File "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line
> 521, in run
> allow_proto_holders=True).run(False)
> File "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line
> 534, in run
> return self.runner.run_pipeline(self, self._options)
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py",
> line 119, in run_pipeline
> return runner.run_pipeline(pipeline, options)
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> line 173, in run_pipeline
> pipeline.to_runner_api(default_environment=self._default_environment))
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> line 183, in run_via_runner_api
> return self.run_stages(stage_context, stages)
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> line 340, in run_stages
> bundle_context_manager,
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> line 519, in _run_stage
> bundle_manager)
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> line 557, in _run_bundle
> data_input, data_output, input_timers, expected_timer_output)
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> line 941, in process_bundle
> timer_inputs)):
> File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 598, in
> result_iterator
> yield fs.pop().result()
> File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 435, in
> result
> return self.__get_result()
> File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 384, in
> __get_result
> raise self._exception
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/utils/thread_pool_executor.py",
> line 44, in run
> self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> line 937, in execute
> dry_run)
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> line 837, in process_bundle
> result_future = self._worker_handler.control_conn.push(process_bundle_req)
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py",
> line 352, in push
> response = self.worker.do_instruction(request)
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 480, in do_instruction
> getattr(request, request_type), request.instruction_id)
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 515, in process_bundle
> bundle_processor.process_bundle(instruction_id))
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 978, in process_bundle
> element.data)
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 218, in process_encoded
> self.output(decoded_value)
> File "apache_beam/runners/worker/operations.py", line 330, in
> apache_beam.runners.worker.operations.Operation.output
> File "apache_beam/runners/worker/operations.py", line 332, in
> apache_beam.runners.worker.operations.Operation.output
> File "apache_beam/runners/worker/operations.py", line 195, in
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
> File "apache_beam/runners/worker/operations.py", line 755, in
> apache_beam.runners.worker.operations.SdfProcessSizedElements.process
> File "apache_beam/runners/worker/operations.py", line 764, in
> apache_beam.runners.worker.operations.SdfProcessSizedElements.process
> File "apache_beam/runners/common.py", line 971, in
> apache_beam.runners.common.DoFnRunner.process_with_sized_restriction
> File "apache_beam/runners/common.py", line 711, in
> apache_beam.runners.common.PerWindowInvoker.invoke_process
> File "apache_beam/runners/common.py", line 807, in
> apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
> File "apache_beam/runners/common.py", line 1095, in
> apache_beam.runners.common._OutputProcessor.process_outputs
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/transforms/periodicsequence.py",
> line 124, in process
> timestamp.Timestamp(current_output_timestamp))
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/utils/timestamp.py", line
> 64, in __init__
> 'Cannot interpret %s %s as seconds.' % (seconds, type(seconds)))
> TypeError: Cannot interpret Timestamp(1599216802.136201) <class
> 'apache_beam.utils.timestamp.Timestamp'> as seconds.
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)