GitHub user rangadi opened a pull request:
https://github.com/apache/beam/pull/3612
Kafka exactly-once sink.
Implementation of an exactly-once sink for Kafka, making use of
transactions added in Kafka 0.11. This requires exact-once semantics for
runners similar to Dataflow.
This is not ready for merge. Will distribute more once it goes through
initial feedback.
There are a few minor TODOs for implementation and of course need to add
more sink tests.
This uses consumer group id for the topic to store metadata. It seems to
work well, but it is not very convenient for a user to manage. E.g. if the user
wants to restart a job from scratch using the same group_id, existing metadata
prevents it from starting. User has to use new group (which is perfectly fine)
or needs to clear the metadata programatically. This sink could help users
manage this better (e.g. an option to discard metatada). will see.
How this works : from a comment in he code:
//
// Dataflow ensures at-least once processing for side effects like
sinks. In order to provide
// exactly-once semantics, a sink needs to be idempotent or it should
avoid writing records
// that have already been written. This snk does the latter. All the
the records are ordered
// across a fixed number of shards and records in each shard are
written in order. It drops
// any records that are already written and buffers those arriving out
of order.
//
// // Exactly once sink involves two shuffles of the records:
// A -- GBK --> B -- GBK --> C
//
// Processing guarantees also require deterministic processing within
user transforms.
// in this case that implies the order of the records seen by C should
not be affected by
// restarts in upstream stages link B & A.
//
// A : Assigns a random shard for message. Note that there are no
ordering guarantees for
// writing user records to Kafka. User can still control
partitioning among topic
// partitions as with regular sink (of course, there are no
ordering guarantees in
// regular Kafka sink either).
// B : Assigns an id sequentially for each messages within a shard.
// C : Writes each shard to Kafka in sequential id order. In Dataflow,
when C sees a record
// and id, it implies that record and the associated id are
checkpointed to persistent
// storage and this record will always have same id, even in
retries.
// Exactly-once semantics are achieved by writing records in the
strict order of
// these checkpointed sequence ids.
//
// Parallelism for B and C is fixed to 'numShards', which defaults to
number of partitions
// for the topic. A few reasons for that:
// - B & C implement their functionality using per-key state. Shard id
makes it independent
// of cardinality of user key.
// - We create one producer per shard, and its 'transactional id' is
based on shard id. This
// requires that number of shards to be finite. This also helps with
batching. and avoids
// initializing producers and transactions.
// - Most importantly, each of sharded writers stores 'next message
id' in partition
// metadata, which is committed atomically with Kafka transactions.
This is critical
// to handle retries of C correctly. Initial testing showed number
of shards could be
// larger than number of partitions for the topic.
//
// Number of shards can change across multiple runs of a pipeline (job
upgrade in Dataflow).
//
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/rangadi/beam eo_sink
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/beam/pull/3612.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #3612
----
commit 76bab2d8ec83ccc2826c929a50fb13d62bb4685b
Author: Raghu Angadi <[email protected]>
Date: 2017-07-21T06:26:49Z
Kafka exactly-once sink.
Tested manually with direct runner and on dataflow.
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---