Hi,

I'm afraid @RequiresTimeSortedInput currently does not fit this requirement, because it works on event _timestamps_ only. Assigning Kafka offsets as event timestamps is probably not a good idea. In the original proposal [1] there is mention that it would be good to implement sorting by some other field (which would be required to correlate with timestamps, but that should be the case for Kafka offsets). Unfortunately, that is not yet implemented. If you run your pipeline in streaming mode, it should be possible to implement this yourself using BagState and Timer that will fire periodically, gather all elements with timestamp less than 'timerTimestamp - allowed lateness', sort them by Kafka offset and process. I wouldn't recommend relying on Kafka offsets wouldn't have gaps, because they can (at least for compacted topics, but very likely in other cases as well).

Jan

[1] https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing

On 6/11/20 5:30 PM, Reuven Lax wrote:
I would not recommend using RequiresTimeSortedInput in this way. I also would not use ingestion time, as in a distributed environment, time skew between workers might mess up the order.

I will ping the discussion on the sorted state API and add you. My hope is that this can be implemented efficiently soon, though efficient implementation will definitely be runner dependent. If you are using Flink, we'll have to figure how to implement this efficiently using Flink. Dataflow is planning on providing native support for sorted state. I don't know if Flink has native support for this, so it might have to be emulated using its existing state primitives.

In the meanwhile, I would suggest using bagstate along with timers. The timer can periodically pull sorted messages out of the bag (you can use watermark timers here) and write back the messages that still have gaps in them.

Reuven

On Wed, Jun 10, 2020 at 6:34 PM Catlyn Kong <catl...@yelp.com <mailto:catl...@yelp.com>> wrote:

    Thank y’all for the input!



    About the RequiresTimeSortedInput, we were thinking of the
    following 2 potential approaches:

    1.

        Assign kafka offset as the timestamp while doing a GroupByKey
        on partition_id in a GlobalWindow

    2.

        Rely on the fact that Flink consumes from kafka partitions in
        offset order and assign ingestion time as the timestamp.
        (We're using our own non-KafkaIO based Kafka consumer extended
        from FlinkKafkaConsumer011 and thus have direct control over
        timestamp and watermark assignment)

    We find it non-trivial to reason about watermark assignment
    especially when taking into consideration that:

    1.

        there might be restarts at any given time and

    2.

        advancing watermark in one kafka partition might result in:

        1.

            dropping elements from other kafka partitions (if we’re
            not following native flink approach where we take the
            lowest watermark when merging streams) or

        2.

            delay output from other kafka partitions since they’ll be
            buffered.

    Is there any recommendation on how this should be handled? In the
    direction of using a StatefulDoFn to buffer and reorder, we’re
    concerned about performance since we need to serialize and
    deserialize the entire BagState (with all the messages) everytime
    we process a message. And potentially insert this StatefulDoFn in
    multiple places in the pipeline. Is there any benchmark result of
    a pipeline that does something similar for us to reference?The
    proposal for a sorted state API sounds promising, is there a
    ticket/doc that we can follow?



    On Wed, Jun 10, 2020 at 1:28 PM Reuven Lax <re...@google.com
    <mailto:re...@google.com>> wrote:

        I don't know how well RequiresTimeSortedInput will work for
        any late data.

        I think you will want to include the Kafka offset in your
        records (unless the records have their own sequence number)
        and then use state to buffer and sort. There is a proposal
        (and work in progress) for a sorted state API, which will make
        this easier and more efficient.

        Reuven

        On Wed, Jun 10, 2020 at 1:25 PM Luke Cwik <lc...@google.com
        <mailto:lc...@google.com>> wrote:

            For runners that support @RequiresTimeSortedInput, all
            your input will come time sorted (as long as your
            element's timestamp tracks the order that you want).
            For runners that don't support this, you need to build a
            StatefulDoFn that buffers out of order events and reorders
            them to the order that you need.

            @Pablo Estrada <mailto:pabl...@google.com> Any other
            suggestions for supporting CDC type pipelines?

            On Tue, Jun 9, 2020 at 6:59 PM Catlyn Kong
            <catl...@yelp.com <mailto:catl...@yelp.com>> wrote:

                Thanks a lot for the response!

                We have several business use cases that rely strongly
                on ordering by Kafka offset:1) streaming unwindowed
                inner join: say we want to join users with reviews on
                user_id. Here are the schemas for two streams:    user:

                 *
                    user_id
                 *
                    name
                 *
                    timestamp

                    reviews:

                 *
                    review_id
                 *
                    user_id
                 *
                    timestamp

                Here are the messages in each stream ordered by kafka
                offset:    user:
                    (1, name_a, 60), (2, name_b, 120), (1, name_c, 240)
                    reviews:
                    (ABC, 1, 90), (DEF, 2, 360)
                I would expect to receive following output messages:
                    (1, name_a, ABC) at timestamp 90    (1, name_c,
                ABC) at timestamp 240
                    (2, name_b, DEF) at timestamp 360This can be done
                in native Flink since Flink kafka consumer reads from
                each partition sequentially. But without an ordering
                guarantee, we can end up with arbitrary results. So
                how would we implement this in Beam?2) unwindowed
                aggregation: aggregate all the employees for every
                organization. Say we have a new employee stream with
                the following schema:
                    new_employee:

                  * organization_id
                 *
                    employee_name

                And here are messaged ordered by kafka offset:
                (1, name_a), (2, name_b), (2, name_c), (1, name_d)
                I would expect the output to be:
                (1, [name_a]), (2, [name_b]), (2, [name_b, name_c]),
                (1, [name_a, name_d])
                Again without an ordering guarantee, the result is non
                deterministic. Change data capture (CDC) streams are a
                very common use case for our data pipeline. As in the
                examples above we rely on Kafka offsets to make sure
                we process data mutations in the proper order. While
                in some cases we have Flink native solutions to these
                problems (Flink provides ordering guarantees within
                the chosen key), we are now building some new Beam
                applications that would require ordering guarantees.
                What is the recommended approach in Beam for such use
                cases? If this isn’t currently supported, do we have
                any near plan to add native ordering support in Beam?

                On 2020/06/09 20:37:22, Luke Cwik <l...@google.com
                <http://google.com>> wrote:
                > This will likely break due to:>
                > * workers can have more then one thread and hence
                process the source in>
                > parallel>
                > * splitting a source allows for the source to be
                broken up into multiple>
                > restrictions and hence the runner can process those
                restrictions in any>
                > order they want. (lets say your kafka partition has
                unconsumed commit>
                > offset range [20, 100), this could be split into
                [20, 60), [60, 100) and>
                > the [60, 100) offset range could be processed first)>
                >
                > You're right that you need to sort the output
                however you want within your>
                > DoFn before you make external calls to Kafka (this
                prevents you from using>
                > the KafkaIO sink implementation as a transform).
                There is an annotation>
                > @RequiresTimeSortedInput which is a special case for
                this sorting if you>
                > want it to be sorted by the elements timestamp but
                still you'll need to>
                > write to Kafka directly yourself from your DoFn.>
                >
                > On Mon, Jun 8, 2020 at 4:24 PM Hadi Zhang
                <ha...@yelp.com <http://yelp.com>> wrote:>
                >
                > > We are using the Beam 2.20 Python SDK on a Flink
                1.9 runner. Our>
                > > messages originate from a custom source that
                consumes messages from a>
                > > Kafka topic and emits them in the order of their
                Kafka offsets to a>
                > > DoFn. After this DoFn processes the messages, they
                are emitted to a>
                > > custom sink that sends messages to a Kafka topic.>
                > >>
                > > We want to process those messages in the order in
                which we receive>
                > > them from Kafka and then emit them to the Kafka
                sink in the same>
                > > order, but based on our understanding Beam does
                not provide an>
                > > in-order transport. However, in practice we
                noticed that with a Python>
                > > SDK worker on Flink and a parallelism setting of 1
                and one sdk_worker>
                > > instance, messages seem to be both processed and
                emitted in order. Is>
                > > that implementation-specific in-order behavior
                something that we can>
                > > rely on, or is it very likely that this will break
                at some future>
                > > point?>
                > >>
                > > In case it's not recommended to depend on that
                behavior what is the>
                > > best approach for in-order processing?>
                > >>
                > >
                
https://stackoverflow.com/questions/45888719/processing-total-ordering-of-events-by-key-using-apache-beam>

                > > recommends to order events in a heap, but
                according to our>
                > > understanding this approach will only work when
                directly writing to an>
                > > external system.>
                > >>
                >

Reply via email to