+dev to see if anyone has any suggestions. On Fri, Jun 16, 2023 at 5:46 PM Sam Bourne <samb...@gmail.com> wrote:
> Hello beam community! > > I’m having trouble coming up with the best pattern to *eagerly* poll. By > eagerly, I mean that elements should be consumed and yielded as soon as > possible. There are a handful of experiments that I’ve tried and my latest > attempt using the timer API seems quite promising, but is operating in a > way that I find rather unintuitive. My solution was to create a sort of > recursive timer callback - which I found one example > <https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/transforms/userstate_test.py#L797> > of within the beam test code. > > I have a few questions: > > 1) The below code runs fine with a single worker but with multiple workers > there are duplicate values. It seems that the callback and snapshot of the > state is provided to multiple workers and the number of duplications > increases with the number of workers. Is this due to the values being > provided to timer.set? > > 2) I’m using TimeDomain.WATERMARK here due to it simply not working when > using REAL_TIME. The docs > <https://beam.apache.org/documentation/programming-guide/#state-and-timers> > seem to suggest REAL_TIME would be the way to do this, however there > seems to be no guarantee that a REAL_TIME callback will run. In this > sample setting the timer to REAL_TIME will simply not ever fire the > callback. Interestingly, if you call timer.set with any value less than > the current time.time(), then the callback will run, however it seems to > fire immediately regardless of the value (and in this sample will actually > raise an AssertionError > <https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/direct/transform_evaluator.py#L943> > ). > > I’m happy for suggestions! > -Sam > > import randomimport threading > import apache_beam as beamimport apache_beam.coders as codersimport > apache_beam.transforms.combiners as combinersimport > apache_beam.transforms.userstate as userstateimport > apache_beam.utils.timestamp as timestampfrom > apache_beam.options.pipeline_options import PipelineOptions > class Log(beam.PTransform): > > lock = threading.Lock() > > @classmethod > def _log(cls, element, label): > with cls.lock: > # This just colors the print in terminal > print('\033[1m\033[92m{}\033[0m : {!r}'.format(label, element)) > return element > > def expand(self, pcoll): > return pcoll | beam.Map(self._log, self.label) > class EagerProcess(beam.DoFn): > > BUFFER_STATE = userstate.BagStateSpec('buffer', coders.PickleCoder()) > POLL_TIMER = userstate.TimerSpec('timer', beam.TimeDomain.WATERMARK) > > def process( > self, > element, > buffer=beam.DoFn.StateParam(BUFFER_STATE), > timer=beam.DoFn.TimerParam(POLL_TIMER), > ): > _, item = element > > for i in range(item): > buffer.add(i) > > timer.set(timestamp.Timestamp.now() + timestamp.Duration(seconds=10)) > > @userstate.on_timer(POLL_TIMER) > def flush( > self, > buffer=beam.DoFn.StateParam(BUFFER_STATE), > timer=beam.DoFn.TimerParam(POLL_TIMER), > ): > cache = buffer.read() > buffer.clear() > > requeue = False > for item in cache: > if random.random() < 0.1: > yield item > else: > buffer.add(item) > requeue = True > > if requeue: > timer.set(timestamp.Timestamp.now() + > timestamp.Duration(seconds=10)) > def main(): > options = PipelineOptions.from_dictionary({ > 'direct_num_workers': 3, > 'direct_running_mode': 'multi_threading', > }) > > pipe = beam.Pipeline(options=options) > ( > pipe > | beam.Create([10]) > | 'Init' >> Log() > | beam.Reify.Timestamp() > | 'PairWithKey' >> beam.Map(lambda x: (hash(x), x)) > | beam.ParDo(EagerProcess()) > | 'Complete' >> Log() > | beam.transforms.combiners.Count.Globally() > | 'Count' >> Log() > ) > result = pipe.run() > result.wait_until_finish() > if __name__ == '__main__': > main() > >