I’m also interested in the answer to this. This is essential for reading from many types of data sources.
On Tue, Jun 20, 2023 at 2:57 PM Sam Bourne <samb...@gmail.com> wrote: > +dev to see if anyone has any suggestions. > > On Fri, Jun 16, 2023 at 5:46 PM Sam Bourne <samb...@gmail.com> wrote: > >> Hello beam community! >> >> I’m having trouble coming up with the best pattern to *eagerly* poll. By >> eagerly, I mean that elements should be consumed and yielded as soon as >> possible. There are a handful of experiments that I’ve tried and my latest >> attempt using the timer API seems quite promising, but is operating in a >> way that I find rather unintuitive. My solution was to create a sort of >> recursive timer callback - which I found one example >> <https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/transforms/userstate_test.py#L797> >> of within the beam test code. >> >> I have a few questions: >> >> 1) The below code runs fine with a single worker but with multiple >> workers there are duplicate values. It seems that the callback and snapshot >> of the state is provided to multiple workers and the number of duplications >> increases with the number of workers. Is this due to the values being >> provided to timer.set? >> >> 2) I’m using TimeDomain.WATERMARK here due to it simply not working when >> using REAL_TIME. The docs >> <https://beam.apache.org/documentation/programming-guide/#state-and-timers> >> seem to suggest REAL_TIME would be the way to do this, however there >> seems to be no guarantee that a REAL_TIME callback will run. In this >> sample setting the timer to REAL_TIME will simply not ever fire the >> callback. Interestingly, if you call timer.set with any value less than >> the current time.time(), then the callback will run, however it seems to >> fire immediately regardless of the value (and in this sample will actually >> raise an AssertionError >> <https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/direct/transform_evaluator.py#L943> >> ). >> >> I’m happy for suggestions! >> -Sam >> >> import randomimport threading >> import apache_beam as beamimport apache_beam.coders as codersimport >> apache_beam.transforms.combiners as combinersimport >> apache_beam.transforms.userstate as userstateimport >> apache_beam.utils.timestamp as timestampfrom >> apache_beam.options.pipeline_options import PipelineOptions >> class Log(beam.PTransform): >> >> lock = threading.Lock() >> >> @classmethod >> def _log(cls, element, label): >> with cls.lock: >> # This just colors the print in terminal >> print('\033[1m\033[92m{}\033[0m : {!r}'.format(label, element)) >> return element >> >> def expand(self, pcoll): >> return pcoll | beam.Map(self._log, self.label) >> class EagerProcess(beam.DoFn): >> >> BUFFER_STATE = userstate.BagStateSpec('buffer', coders.PickleCoder()) >> POLL_TIMER = userstate.TimerSpec('timer', beam.TimeDomain.WATERMARK) >> >> def process( >> self, >> element, >> buffer=beam.DoFn.StateParam(BUFFER_STATE), >> timer=beam.DoFn.TimerParam(POLL_TIMER), >> ): >> _, item = element >> >> for i in range(item): >> buffer.add(i) >> >> timer.set(timestamp.Timestamp.now() + timestamp.Duration(seconds=10)) >> >> @userstate.on_timer(POLL_TIMER) >> def flush( >> self, >> buffer=beam.DoFn.StateParam(BUFFER_STATE), >> timer=beam.DoFn.TimerParam(POLL_TIMER), >> ): >> cache = buffer.read() >> buffer.clear() >> >> requeue = False >> for item in cache: >> if random.random() < 0.1: >> yield item >> else: >> buffer.add(item) >> requeue = True >> >> if requeue: >> timer.set(timestamp.Timestamp.now() + >> timestamp.Duration(seconds=10)) >> def main(): >> options = PipelineOptions.from_dictionary({ >> 'direct_num_workers': 3, >> 'direct_running_mode': 'multi_threading', >> }) >> >> pipe = beam.Pipeline(options=options) >> ( >> pipe >> | beam.Create([10]) >> | 'Init' >> Log() >> | beam.Reify.Timestamp() >> | 'PairWithKey' >> beam.Map(lambda x: (hash(x), x)) >> | beam.ParDo(EagerProcess()) >> | 'Complete' >> Log() >> | beam.transforms.combiners.Count.Globally() >> | 'Count' >> Log() >> ) >> result = pipe.run() >> result.wait_until_finish() >> if __name__ == '__main__': >> main() >> >>