> The below code runs fine with a single worker but with multiple workers
there are duplicate values.
> I’m using TimeDomain.WATERMARK here due to it simply not working when
using REAL_TIME. The docs seem to suggest REAL_TIME would be the way to do
this, however there seems to be no guarantee that a REAL_TIME callback will
run.

It seems that you are using Python direct runner for experimentation. The
streaming support in Python direct runner is currently rather limited:
https://github.com/apache/beam/issues/21987 , it is possible that direct
runner doesn't correctly implement the streaming semantics. It sounds like
we should identify whether this is a problem in the SDK or in the
DirectRunner implementation, and file issues accordingly. Streaming direct
runner issues use this umbrella issue:
https://github.com/apache/beam/issues/21987. I would also experiment with
FlinkRunner or DataflowRunner. Also the streaming semantics behavior should
be consistent across SDK, so different behavior between Python and  Java
SDK would implicate an SDK bug.


On Thu, Jun 22, 2023 at 10:00 AM Chad Dombrova <chad...@gmail.com> wrote:

> I’m also interested in the answer to this.  This is essential for reading
> from many types of data sources.
>
>
> On Tue, Jun 20, 2023 at 2:57 PM Sam Bourne <samb...@gmail.com> wrote:
>
>> +dev to see if anyone has any suggestions.
>>
>> On Fri, Jun 16, 2023 at 5:46 PM Sam Bourne <samb...@gmail.com> wrote:
>>
>>> Hello beam community!
>>>
>>> I’m having trouble coming up with the best pattern to *eagerly* poll.
>>> By eagerly, I mean that elements should be consumed and yielded as soon as
>>> possible. There are a handful of experiments that I’ve tried and my latest
>>> attempt using the timer API seems quite promising, but is operating in a
>>> way that I find rather unintuitive. My solution was to create a sort of
>>> recursive timer callback - which I found one example
>>> <https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/transforms/userstate_test.py#L797>
>>> of within the beam test code.
>>>
>>> I have a few questions:
>>>
>>> 1) The below code runs fine with a single worker but with multiple
>>> workers there are duplicate values. It seems that the callback and snapshot
>>> of the state is provided to multiple workers and the number of duplications
>>> increases with the number of workers. Is this due to the values being
>>> provided to timer.set?
>>>
>>> 2) I’m using TimeDomain.WATERMARK here due to it simply not working
>>> when using REAL_TIME. The docs
>>> <https://beam.apache.org/documentation/programming-guide/#state-and-timers>
>>> seem to suggest REAL_TIME would be the way to do this, however there
>>> seems to be no guarantee that a REAL_TIME callback will run. In this
>>> sample setting the timer to REAL_TIME will simply not ever fire the
>>> callback. Interestingly, if you call timer.set with any value less than
>>> the current time.time(), then the callback will run, however it seems
>>> to fire immediately regardless of the value (and in this sample will
>>> actually raise an AssertionError
>>> <https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/direct/transform_evaluator.py#L943>
>>> ).
>>>
>>> I’m happy for suggestions!
>>> -Sam
>>>
>>> import randomimport threading
>>> import apache_beam as beamimport apache_beam.coders as codersimport 
>>> apache_beam.transforms.combiners as combinersimport 
>>> apache_beam.transforms.userstate as userstateimport 
>>> apache_beam.utils.timestamp as timestampfrom 
>>> apache_beam.options.pipeline_options import PipelineOptions
>>> class Log(beam.PTransform):
>>>
>>>     lock = threading.Lock()
>>>
>>>     @classmethod
>>>     def _log(cls, element, label):
>>>         with cls.lock:
>>>             # This just colors the print in terminal
>>>             print('\033[1m\033[92m{}\033[0m : {!r}'.format(label, element))
>>>         return element
>>>
>>>     def expand(self, pcoll):
>>>         return pcoll | beam.Map(self._log, self.label)
>>> class EagerProcess(beam.DoFn):
>>>
>>>     BUFFER_STATE = userstate.BagStateSpec('buffer', coders.PickleCoder())
>>>     POLL_TIMER = userstate.TimerSpec('timer', beam.TimeDomain.WATERMARK)
>>>
>>>     def process(
>>>             self,
>>>             element,
>>>             buffer=beam.DoFn.StateParam(BUFFER_STATE),
>>>             timer=beam.DoFn.TimerParam(POLL_TIMER),
>>>     ):
>>>         _, item = element
>>>
>>>         for i in range(item):
>>>             buffer.add(i)
>>>
>>>         timer.set(timestamp.Timestamp.now() + 
>>> timestamp.Duration(seconds=10))
>>>
>>>     @userstate.on_timer(POLL_TIMER)
>>>     def flush(
>>>             self,
>>>             buffer=beam.DoFn.StateParam(BUFFER_STATE),
>>>             timer=beam.DoFn.TimerParam(POLL_TIMER),
>>>     ):
>>>         cache = buffer.read()
>>>         buffer.clear()
>>>
>>>         requeue = False
>>>         for item in cache:
>>>             if random.random() < 0.1:
>>>                 yield item
>>>             else:
>>>                 buffer.add(item)
>>>                 requeue = True
>>>
>>>         if requeue:
>>>             timer.set(timestamp.Timestamp.now() + 
>>> timestamp.Duration(seconds=10))
>>> def main():
>>>     options = PipelineOptions.from_dictionary({
>>>         'direct_num_workers': 3,
>>>         'direct_running_mode': 'multi_threading',
>>>     })
>>>
>>>     pipe = beam.Pipeline(options=options)
>>>     (
>>>         pipe
>>>         | beam.Create([10])
>>>         | 'Init' >> Log()
>>>         | beam.Reify.Timestamp()
>>>         | 'PairWithKey' >> beam.Map(lambda x: (hash(x), x))
>>>         | beam.ParDo(EagerProcess())
>>>         | 'Complete' >> Log()
>>>         | beam.transforms.combiners.Count.Globally()
>>>         | 'Count' >> Log()
>>>     )
>>>     result = pipe.run()
>>>     result.wait_until_finish()
>>> if __name__ == '__main__':
>>>     main()
>>>
>>>

Reply via email to