Fabien LD commented on KAFKA-6915:

The above solution also works when MirrorMaker is used in push mode (it does 
not have to wait for ack from aggregate cluster to commit offsets: they will be 
handled by that aggregate cluster if it received all data and offsets).
Note: there could be here a transaction like processing: if we can write all 
records and offsets, then commit transaction. Otherwise, do not commit it and 
consumers with {{isolation.level=read_committed}} will ignore them and not see 

Another option to improve the risk of duplicates, only usefull when using 
MirrorMaker in pull mode, is to have a way to delay the rebalance decision when 
a consumer disappears. If during that delay all consumers from group disappear, 
then we likely have a global network issue. And then, when they come back, if 
all come back during {{group.initial.rebalance.delay.ms}} and have the right 
generation, then we accept their offset commits and keep going on.

> MirrorMaker: avoid duplicates when source cluster is unreachable for more 
> than session.timeout.ms
> -------------------------------------------------------------------------------------------------
>                 Key: KAFKA-6915
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6915
>             Project: Kafka
>          Issue Type: Improvement
>    Affects Versions: 1.1.0
>            Reporter: Fabien LD
>            Priority: Major
>              Labels: mirror-maker
> According to doc, see 
> [https://kafka.apache.org/11/documentation.html#semantics], the exactly-once 
> delivery can be achieved by storing offsets in the same store as produced 
> data:
> {quote}
> When writing to an external system, the limitation is in the need to 
> coordinate the consumer's position with what is actually stored as output. 
> The classic way of achieving this would be to introduce a two-phase commit 
> between the storage of the consumer position and the storage of the consumers 
> output. But this can be handled more simply and generally by letting the 
> consumer store its offset in the same place as its output
> {quote}
> Indeed, with current implementation where the consumer stores the offsets in 
> the source cluster, we can have duplicates if networks makes source cluster 
> unreachable for more than {{session.timeout.ms}}.
> Indeed, once that amount of time has passed, the source cluster will 
> rebalance the consumer group and later, when network is back, the generation 
> has changed and consumers cannot commit the offsets for the last batches of 
> records consumed (actually all records processed during the last 
> {{auto.commit.interval.ms}}). So all those records are processed again when 
> consumers from group are coming back.
> Storing the offsets in the target cluster would resolve this risk of 
> duplicate records and would be a nice feature to have.

This message was sent by Atlassian JIRA

Reply via email to