For runners that support @RequiresTimeSortedInput, all your input will come time sorted (as long as your element's timestamp tracks the order that you want). For runners that don't support this, you need to build a StatefulDoFn that buffers out of order events and reorders them to the order that you need.
@Pablo Estrada <[email protected]> Any other suggestions for supporting CDC type pipelines? On Tue, Jun 9, 2020 at 6:59 PM Catlyn Kong <[email protected]> wrote: > Thanks a lot for the response! > > We have several business use cases that rely strongly on ordering by Kafka > offset: > 1) streaming unwindowed inner join: say we want to join users with reviews > on user_id. Here are the schemas for two streams: > user: > > - user_id > - name > - timestamp > > reviews: > > - review_id > - user_id > - timestamp > > Here are the messages in each stream ordered by kafka offset: > user: > (1, name_a, 60), (2, name_b, 120), (1, name_c, 240) > reviews: > (ABC, 1, 90), (DEF, 2, 360) > I would expect to receive following output messages: > (1, name_a, ABC) at timestamp 90 > (1, name_c, ABC) at timestamp 240 > (2, name_b, DEF) at timestamp 360 > This can be done in native Flink since Flink kafka consumer reads from > each partition sequentially. But without an ordering guarantee, we can end > up with arbitrary results. So how would we implement this in Beam? > 2) unwindowed aggregation: aggregate all the employees for every > organization. Say we have a new employee stream with the following schema: > new_employee: > > - organization_id > - employee_name > > And here are messaged ordered by kafka offset: > (1, name_a), (2, name_b), (2, name_c), (1, name_d) > I would expect the output to be: > (1, [name_a]), (2, [name_b]), (2, [name_b, name_c]), (1, [name_a, name_d]) > Again without an ordering guarantee, the result is non deterministic. > > Change data capture (CDC) streams are a very common use case for our data > pipeline. As in the examples above we rely on Kafka offsets to make sure we > process data mutations in the proper order. While in some cases we have > Flink native solutions to these problems (Flink provides ordering > guarantees within the chosen key), we are now building some new Beam > applications that would require ordering guarantees. What is the > recommended approach in Beam for such use cases? If this isn’t currently > supported, do we have any near plan to add native ordering support in Beam? > > > On 2020/06/09 20:37:22, Luke Cwik <[email protected]> wrote: > > This will likely break due to:> > > * workers can have more then one thread and hence process the source in> > > parallel> > > * splitting a source allows for the source to be broken up into > multiple> > > restrictions and hence the runner can process those restrictions in any> > > order they want. (lets say your kafka partition has unconsumed commit> > > offset range [20, 100), this could be split into [20, 60), [60, 100) > and> > > the [60, 100) offset range could be processed first)> > > > > You're right that you need to sort the output however you want within > your> > > DoFn before you make external calls to Kafka (this prevents you from > using> > > the KafkaIO sink implementation as a transform). There is an annotation> > > @RequiresTimeSortedInput which is a special case for this sorting if > you> > > want it to be sorted by the elements timestamp but still you'll need to> > > write to Kafka directly yourself from your DoFn.> > > > > On Mon, Jun 8, 2020 at 4:24 PM Hadi Zhang <[email protected]> wrote:> > > > > > We are using the Beam 2.20 Python SDK on a Flink 1.9 runner. Our> > > > messages originate from a custom source that consumes messages from a> > > > Kafka topic and emits them in the order of their Kafka offsets to a> > > > DoFn. After this DoFn processes the messages, they are emitted to a> > > > custom sink that sends messages to a Kafka topic.> > > >> > > > We want to process those messages in the order in which we receive> > > > them from Kafka and then emit them to the Kafka sink in the same> > > > order, but based on our understanding Beam does not provide an> > > > in-order transport. However, in practice we noticed that with a > Python> > > > SDK worker on Flink and a parallelism setting of 1 and one sdk_worker> > > > instance, messages seem to be both processed and emitted in order. Is> > > > that implementation-specific in-order behavior something that we can> > > > rely on, or is it very likely that this will break at some future> > > > point?> > > >> > > > In case it's not recommended to depend on that behavior what is the> > > > best approach for in-order processing?> > > >> > > > > https://stackoverflow.com/questions/45888719/processing-total-ordering-of-events-by-key-using-apache-beam> > > > > recommends to order events in a heap, but according to our> > > > understanding this approach will only work when directly writing to > an> > > > external system.> > > >> > > >
