+dev to see if anyone has any suggestions.

On Fri, Jun 16, 2023 at 5:46 PM Sam Bourne <samb...@gmail.com> wrote:

> Hello beam community!
>
> I’m having trouble coming up with the best pattern to *eagerly* poll. By
> eagerly, I mean that elements should be consumed and yielded as soon as
> possible. There are a handful of experiments that I’ve tried and my latest
> attempt using the timer API seems quite promising, but is operating in a
> way that I find rather unintuitive. My solution was to create a sort of
> recursive timer callback - which I found one example
> <https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/transforms/userstate_test.py#L797>
> of within the beam test code.
>
> I have a few questions:
>
> 1) The below code runs fine with a single worker but with multiple workers
> there are duplicate values. It seems that the callback and snapshot of the
> state is provided to multiple workers and the number of duplications
> increases with the number of workers. Is this due to the values being
> provided to timer.set?
>
> 2) I’m using TimeDomain.WATERMARK here due to it simply not working when
> using REAL_TIME. The docs
> <https://beam.apache.org/documentation/programming-guide/#state-and-timers>
> seem to suggest REAL_TIME would be the way to do this, however there
> seems to be no guarantee that a REAL_TIME callback will run. In this
> sample setting the timer to REAL_TIME will simply not ever fire the
> callback. Interestingly, if you call timer.set with any value less than
> the current time.time(), then the callback will run, however it seems to
> fire immediately regardless of the value (and in this sample will actually
> raise an AssertionError
> <https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/direct/transform_evaluator.py#L943>
> ).
>
> I’m happy for suggestions!
> -Sam
>
> import randomimport threading
> import apache_beam as beamimport apache_beam.coders as codersimport 
> apache_beam.transforms.combiners as combinersimport 
> apache_beam.transforms.userstate as userstateimport 
> apache_beam.utils.timestamp as timestampfrom 
> apache_beam.options.pipeline_options import PipelineOptions
> class Log(beam.PTransform):
>
>     lock = threading.Lock()
>
>     @classmethod
>     def _log(cls, element, label):
>         with cls.lock:
>             # This just colors the print in terminal
>             print('\033[1m\033[92m{}\033[0m : {!r}'.format(label, element))
>         return element
>
>     def expand(self, pcoll):
>         return pcoll | beam.Map(self._log, self.label)
> class EagerProcess(beam.DoFn):
>
>     BUFFER_STATE = userstate.BagStateSpec('buffer', coders.PickleCoder())
>     POLL_TIMER = userstate.TimerSpec('timer', beam.TimeDomain.WATERMARK)
>
>     def process(
>             self,
>             element,
>             buffer=beam.DoFn.StateParam(BUFFER_STATE),
>             timer=beam.DoFn.TimerParam(POLL_TIMER),
>     ):
>         _, item = element
>
>         for i in range(item):
>             buffer.add(i)
>
>         timer.set(timestamp.Timestamp.now() + timestamp.Duration(seconds=10))
>
>     @userstate.on_timer(POLL_TIMER)
>     def flush(
>             self,
>             buffer=beam.DoFn.StateParam(BUFFER_STATE),
>             timer=beam.DoFn.TimerParam(POLL_TIMER),
>     ):
>         cache = buffer.read()
>         buffer.clear()
>
>         requeue = False
>         for item in cache:
>             if random.random() < 0.1:
>                 yield item
>             else:
>                 buffer.add(item)
>                 requeue = True
>
>         if requeue:
>             timer.set(timestamp.Timestamp.now() + 
> timestamp.Duration(seconds=10))
> def main():
>     options = PipelineOptions.from_dictionary({
>         'direct_num_workers': 3,
>         'direct_running_mode': 'multi_threading',
>     })
>
>     pipe = beam.Pipeline(options=options)
>     (
>         pipe
>         | beam.Create([10])
>         | 'Init' >> Log()
>         | beam.Reify.Timestamp()
>         | 'PairWithKey' >> beam.Map(lambda x: (hash(x), x))
>         | beam.ParDo(EagerProcess())
>         | 'Complete' >> Log()
>         | beam.transforms.combiners.Count.Globally()
>         | 'Count' >> Log()
>     )
>     result = pipe.run()
>     result.wait_until_finish()
> if __name__ == '__main__':
>     main()
>
>

Reply via email to