GitHub user rangadi opened a pull request:

    https://github.com/apache/beam/pull/3612

    Kafka exactly-once sink.

    Implementation of an exactly-once sink for Kafka, making use of 
transactions added in Kafka 0.11. This requires exact-once semantics for 
runners similar to Dataflow. 
    
    This is not ready for merge. Will distribute more once it goes through 
initial feedback.
    
    There are a few minor TODOs for implementation and of course need to add 
more sink tests.
    
    This uses consumer group id for the topic to store metadata. It seems to 
work well, but it is not very convenient for a user to manage. E.g. if the user 
wants to restart a job from scratch using the same group_id, existing metadata 
prevents it from starting. User has to use new group (which is perfectly fine) 
or needs to clear the metadata programatically. This sink could help users 
manage this better (e.g. an option to discard metatada). will see.
    
    How this works : from a comment in he code:
    
        //
        // Dataflow ensures at-least once processing for side effects like 
sinks. In order to provide
        // exactly-once semantics, a sink needs to be idempotent or it should 
avoid writing records
        // that have already been written. This snk does the latter. All the 
the records are ordered
        // across a fixed number of shards and records in each shard are 
written in order. It drops
        // any records that are already written and buffers those arriving out 
of order.
        //
        //  // Exactly once sink involves two shuffles of the records:
        //            A -- GBK --> B -- GBK --> C
        //
        // Processing guarantees also require deterministic processing within 
user transforms.
        // in this case that implies the order of the records seen by C should 
not be affected by
        // restarts in upstream stages link B & A.
        //
        // A : Assigns a random shard for message. Note that there are no 
ordering guarantees for
        //     writing user records to Kafka. User can still control 
partitioning among topic
        //     partitions as with regular sink (of course, there are no 
ordering guarantees in
        //     regular Kafka sink either).
        // B : Assigns an id sequentially for each messages within a shard.
        // C : Writes each shard to Kafka in sequential id order. In Dataflow, 
when C sees a record
        //     and id, it implies that record and the associated id are 
checkpointed to persistent
        //     storage and this record will always have same id, even in 
retries.
        //     Exactly-once semantics are achieved by writing records in the 
strict order of
        //     these checkpointed sequence ids.
        //
        // Parallelism for B and C is fixed to 'numShards', which defaults to 
number of partitions
        // for the topic. A few reasons for that:
        //  - B & C implement their functionality using per-key state. Shard id 
makes it independent
        //    of cardinality of user key.
        //  - We create one producer per shard, and its 'transactional id' is 
based on shard id. This
        //    requires that number of shards to be finite. This also helps with 
batching. and avoids
        //    initializing producers and transactions.
        //  - Most importantly, each of sharded writers stores 'next message 
id' in partition
        //    metadata, which is committed atomically with Kafka transactions. 
This is critical
        //    to handle retries of C correctly. Initial testing showed number 
of shards could be
        //    larger than number of partitions for the topic.
        //
        // Number of shards can change across multiple runs of a pipeline (job 
upgrade in Dataflow).
        //
    
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/rangadi/beam eo_sink

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/beam/pull/3612.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3612
    
----
commit 76bab2d8ec83ccc2826c929a50fb13d62bb4685b
Author: Raghu Angadi <[email protected]>
Date:   2017-07-21T06:26:49Z

    Kafka exactly-once sink.
    Tested manually with direct runner and on dataflow.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to