I’m also interested in the answer to this.  This is essential for reading
from many types of data sources.


On Tue, Jun 20, 2023 at 2:57 PM Sam Bourne <samb...@gmail.com> wrote:

> +dev to see if anyone has any suggestions.
>
> On Fri, Jun 16, 2023 at 5:46 PM Sam Bourne <samb...@gmail.com> wrote:
>
>> Hello beam community!
>>
>> I’m having trouble coming up with the best pattern to *eagerly* poll. By
>> eagerly, I mean that elements should be consumed and yielded as soon as
>> possible. There are a handful of experiments that I’ve tried and my latest
>> attempt using the timer API seems quite promising, but is operating in a
>> way that I find rather unintuitive. My solution was to create a sort of
>> recursive timer callback - which I found one example
>> <https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/transforms/userstate_test.py#L797>
>> of within the beam test code.
>>
>> I have a few questions:
>>
>> 1) The below code runs fine with a single worker but with multiple
>> workers there are duplicate values. It seems that the callback and snapshot
>> of the state is provided to multiple workers and the number of duplications
>> increases with the number of workers. Is this due to the values being
>> provided to timer.set?
>>
>> 2) I’m using TimeDomain.WATERMARK here due to it simply not working when
>> using REAL_TIME. The docs
>> <https://beam.apache.org/documentation/programming-guide/#state-and-timers>
>> seem to suggest REAL_TIME would be the way to do this, however there
>> seems to be no guarantee that a REAL_TIME callback will run. In this
>> sample setting the timer to REAL_TIME will simply not ever fire the
>> callback. Interestingly, if you call timer.set with any value less than
>> the current time.time(), then the callback will run, however it seems to
>> fire immediately regardless of the value (and in this sample will actually
>> raise an AssertionError
>> <https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/direct/transform_evaluator.py#L943>
>> ).
>>
>> I’m happy for suggestions!
>> -Sam
>>
>> import randomimport threading
>> import apache_beam as beamimport apache_beam.coders as codersimport 
>> apache_beam.transforms.combiners as combinersimport 
>> apache_beam.transforms.userstate as userstateimport 
>> apache_beam.utils.timestamp as timestampfrom 
>> apache_beam.options.pipeline_options import PipelineOptions
>> class Log(beam.PTransform):
>>
>>     lock = threading.Lock()
>>
>>     @classmethod
>>     def _log(cls, element, label):
>>         with cls.lock:
>>             # This just colors the print in terminal
>>             print('\033[1m\033[92m{}\033[0m : {!r}'.format(label, element))
>>         return element
>>
>>     def expand(self, pcoll):
>>         return pcoll | beam.Map(self._log, self.label)
>> class EagerProcess(beam.DoFn):
>>
>>     BUFFER_STATE = userstate.BagStateSpec('buffer', coders.PickleCoder())
>>     POLL_TIMER = userstate.TimerSpec('timer', beam.TimeDomain.WATERMARK)
>>
>>     def process(
>>             self,
>>             element,
>>             buffer=beam.DoFn.StateParam(BUFFER_STATE),
>>             timer=beam.DoFn.TimerParam(POLL_TIMER),
>>     ):
>>         _, item = element
>>
>>         for i in range(item):
>>             buffer.add(i)
>>
>>         timer.set(timestamp.Timestamp.now() + timestamp.Duration(seconds=10))
>>
>>     @userstate.on_timer(POLL_TIMER)
>>     def flush(
>>             self,
>>             buffer=beam.DoFn.StateParam(BUFFER_STATE),
>>             timer=beam.DoFn.TimerParam(POLL_TIMER),
>>     ):
>>         cache = buffer.read()
>>         buffer.clear()
>>
>>         requeue = False
>>         for item in cache:
>>             if random.random() < 0.1:
>>                 yield item
>>             else:
>>                 buffer.add(item)
>>                 requeue = True
>>
>>         if requeue:
>>             timer.set(timestamp.Timestamp.now() + 
>> timestamp.Duration(seconds=10))
>> def main():
>>     options = PipelineOptions.from_dictionary({
>>         'direct_num_workers': 3,
>>         'direct_running_mode': 'multi_threading',
>>     })
>>
>>     pipe = beam.Pipeline(options=options)
>>     (
>>         pipe
>>         | beam.Create([10])
>>         | 'Init' >> Log()
>>         | beam.Reify.Timestamp()
>>         | 'PairWithKey' >> beam.Map(lambda x: (hash(x), x))
>>         | beam.ParDo(EagerProcess())
>>         | 'Complete' >> Log()
>>         | beam.transforms.combiners.Count.Globally()
>>         | 'Count' >> Log()
>>     )
>>     result = pipe.run()
>>     result.wait_until_finish()
>> if __name__ == '__main__':
>>     main()
>>
>>

Reply via email to