For runners that support @RequiresTimeSortedInput, all your input will come
time sorted (as long as your element's timestamp tracks the order that you
want).
For runners that don't support this, you need to build a StatefulDoFn that
buffers out of order events and reorders them to the order that you need.

@Pablo Estrada <[email protected]> Any other suggestions for supporting
CDC type pipelines?

On Tue, Jun 9, 2020 at 6:59 PM Catlyn Kong <[email protected]> wrote:

> Thanks a lot for the response!
>
> We have several business use cases that rely strongly on ordering by Kafka
> offset:
> 1) streaming unwindowed inner join: say we want to join users with reviews
> on user_id. Here are the schemas for two streams:
>     user:
>
>    - user_id
>    - name
>    - timestamp
>
>     reviews:
>
>    - review_id
>    - user_id
>    - timestamp
>
> Here are the messages in each stream ordered by kafka offset:
>     user:
>     (1, name_a, 60), (2, name_b, 120), (1, name_c, 240)
>     reviews:
>     (ABC, 1, 90), (DEF, 2, 360)
> I would expect to receive following output messages:
>     (1, name_a, ABC) at timestamp 90
>     (1, name_c, ABC) at timestamp 240
>     (2, name_b, DEF) at timestamp 360
> This can be done in native Flink since Flink kafka consumer reads from
> each partition sequentially. But without an ordering guarantee, we can end
> up with arbitrary results. So how would we implement this in Beam?
> 2) unwindowed aggregation: aggregate all the employees for every
> organization. Say we have a new employee stream with the following schema:
>     new_employee:
>
>    - organization_id
>    - employee_name
>
> And here are messaged ordered by kafka offset:
> (1, name_a), (2, name_b), (2, name_c), (1, name_d)
> I would expect the output to be:
> (1, [name_a]), (2, [name_b]), (2, [name_b, name_c]), (1, [name_a, name_d])
> Again without an ordering guarantee, the result is non deterministic.
>
> Change data capture (CDC) streams are a very common use case for our data
> pipeline. As in the examples above we rely on Kafka offsets to make sure we
> process data mutations in the proper order. While in some cases we have
> Flink native solutions to these problems (Flink provides ordering
> guarantees within the chosen key), we are now building some new Beam
> applications that would require ordering guarantees. What is the
> recommended approach in Beam for such use cases? If this isn’t currently
> supported, do we have any near plan to add native ordering support in Beam?
>
>
> On 2020/06/09 20:37:22, Luke Cwik <[email protected]> wrote:
> > This will likely break due to:>
> > * workers can have more then one thread and hence process the source in>
> > parallel>
> > * splitting a source allows for the source to be broken up into
> multiple>
> > restrictions and hence the runner can process those restrictions in any>
> > order they want. (lets say your kafka partition has unconsumed commit>
> > offset range [20, 100), this could be split into [20, 60), [60, 100)
> and>
> > the [60, 100) offset range could be processed first)>
> >
> > You're right that you need to sort the output however you want within
> your>
> > DoFn before you make external calls to Kafka (this prevents you from
> using>
> > the KafkaIO sink implementation as a transform). There is an annotation>
> > @RequiresTimeSortedInput which is a special case for this sorting if
> you>
> > want it to be sorted by the elements timestamp but still you'll need to>
> > write to Kafka directly yourself from your DoFn.>
> >
> > On Mon, Jun 8, 2020 at 4:24 PM Hadi Zhang <[email protected]> wrote:>
> >
> > > We are using the Beam 2.20 Python SDK on a Flink 1.9 runner. Our>
> > > messages originate from a custom source that consumes messages from a>
> > > Kafka topic and emits them in the order of their Kafka offsets to a>
> > > DoFn. After this DoFn processes the messages, they are emitted to a>
> > > custom sink that sends messages to a Kafka topic.>
> > >>
> > > We want to process those messages in the order in which we receive>
> > > them from Kafka and then emit them to the Kafka sink in the same>
> > > order, but based on our understanding Beam does not provide an>
> > > in-order transport. However, in practice we noticed that with a
> Python>
> > > SDK worker on Flink and a parallelism setting of 1 and one sdk_worker>
> > > instance, messages seem to be both processed and emitted in order. Is>
> > > that implementation-specific in-order behavior something that we can>
> > > rely on, or is it very likely that this will break at some future>
> > > point?>
> > >>
> > > In case it's not recommended to depend on that behavior what is the>
> > > best approach for in-order processing?>
> > >>
> > >
> https://stackoverflow.com/questions/45888719/processing-total-ordering-of-events-by-key-using-apache-beam>
>
> > > recommends to order events in a heap, but according to our>
> > > understanding this approach will only work when directly writing to
> an>
> > > external system.>
> > >>
> >
>

Reply via email to