[ 
https://issues.apache.org/jira/browse/KAFKA-2894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15383859#comment-15383859
 ] 

Lucas Ariel Martinez commented on KAFKA-2894:
---------------------------------------------

I apologize if I am asking in the wrong place, but I believe this issue could 
solve the problem I am having, and which I’ll explain in a bit.
First, how do connectors actually manage their own offsets? I tried creating a 
sink connector extending SinkTask, and:

1) I tried assigning a Map of <TopicPartition, Offset> to the context obtained 
in initialize() by doing context.offset(offsetMap). But this results in an 
exception “No current assignment for partition…” because the partitions were 
not assigned yet.

2) I tried doing the same in the open() method by saving the context from 
initialize() in a variable and assigning to it a map built with the 
Collection<TopicPartition> parameter. The result was to fetch all the messages 
from the last kafka committed offset, plus again, all the messages from the 
offsets I assigned manually to the partitions!
I also tried setting a worker property “auto.offset.reset=latest” as a 
workaround, but apparently it is not supported and the default value of 
earliest was used anyway.

Am I wrong to think that rewinding offsets on rebalance would solve this issue 
and I would be able to manually assign offsets on the open() method?


> WorkerSinkTask doesn't handle rewinding offsets on rebalance
> ------------------------------------------------------------
>
>                 Key: KAFKA-2894
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2894
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 0.9.0.0
>            Reporter: Ewen Cheslack-Postava
>            Assignee: Liquan Pei
>            Priority: Blocker
>             Fix For: 0.10.1.0
>
>
> rewind() is only invoked at the beginning of each poll(). This means that if 
> a rebalance occurs in the poll, it's feasible to get data that doesn't match 
> a request to change offsets during the rebalance. I think the consumer will 
> hold on to consumer data across the rebalance if it is reassigned the same 
> offset, so there may already be data ready to be delivered. Additionally we 
> may already have data in an incomplete messageBatch that should be discarded 
> when the rewind is requested.
> While connectors that care about this (i.e. ones that manage their own 
> offsets) can handle this correctly by tracking the offsets they're expecting 
> to see, it's a hassle, error prone, an pretty unintuitive.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to