On Wed, Jan 24, 2018, at 21:07, Dong Lin wrote:
> Hey Colin,
> 
> Thanks for reviewing the KIP.
> 
> If I understand you right, you maybe suggesting that we can use a global
> metadataEpoch that is incremented every time controller updates metadata.
> The problem with this solution is that, if a topic is deleted and created
> again, user will not know whether that the offset which is stored before
> the topic deletion is no longer valid. This motivates the idea to include
> per-partition partitionEpoch. Does this sound reasonable?

Hi Dong,

Perhaps we can store the last valid offset of each deleted topic in ZooKeeper.  
Then, when a topic with one of those names gets re-created, we can start the 
topic at the previous end offset rather than at 0.  This preserves 
immutability.  It is no more burdensome than having to preserve a "last epoch" 
for the deleted partition somewhere, right?

> 
> Then the next question maybe, should we use a global metadataEpoch +
> per-partition partitionEpoch, instead of using per-partition leaderEpoch +
> per-partition leaderEpoch. The former solution using metadataEpoch would
> not work due to the following scenario (provided by Jun):
> 
> "Consider the following scenario. In metadata v1, the leader for a
> partition is at broker 1. In metadata v2, leader is at broker 2. In
> metadata v3, leader is at broker 1 again. The last committed offset in v1,
> v2 and v3 are 10, 20 and 30, respectively. A consumer is started and reads
> metadata v1 and reads messages from offset 0 to 25 from broker 1. My
> understanding is that in the current proposal, the metadata version
> associated with offset 25 is v1. The consumer is then restarted and fetches
> metadata v2. The consumer tries to read from broker 2, which is the old
> leader with the last offset at 20. In this case, the consumer will still
> get OffsetOutOfRangeException incorrectly."
> 
> Regarding your comment "For the second purpose, this is "soft state"
> anyway.  If the client thinks X is the leader but Y is really the leader,
> the client will talk to X, and X will point out its mistake by sending back
> a NOT_LEADER_FOR_PARTITION.", it is probably no true. The problem here is
> that the old leader X may still think it is the leader of the partition and
> thus it will not send back NOT_LEADER_FOR_PARTITION. The reason is provided
> in KAFKA-6262. Can you check if that makes sense?

This is solvable with a timeout, right?  If the leader can't communicate with 
the controller for a certain period of time, it should stop acting as the 
leader.  We have to solve this problem, anyway, in order to fix all the corner 
cases.

best,
Colin

> 
> Regards,
> Dong
> 
> 
> On Wed, Jan 24, 2018 at 10:39 AM, Colin McCabe <cmcc...@apache.org> wrote:
> 
> > Hi Dong,
> >
> > Thanks for proposing this KIP.  I think a metadata epoch is a really good
> > idea.
> >
> > I read through the DISCUSS thread, but I still don't have a clear picture
> > of why the proposal uses a metadata epoch per partition rather than a
> > global metadata epoch.  A metadata epoch per partition is kind of
> > unpleasant-- it's at least 4 extra bytes per partition that we have to send
> > over the wire in every full metadata request, which could become extra
> > kilobytes on the wire when the number of partitions becomes large.  Plus,
> > we have to update all the auxillary classes to include an epoch.
> >
> > We need to have a global metadata epoch anyway to handle partition
> > addition and deletion.  For example, if I give you
> > MetadataResponse{part1,epoch 1, part2, epoch 1} and {part1, epoch1}, which
> > MetadataResponse is newer?  You have no way of knowing.  It could be that
> > part2 has just been created, and the response with 2 partitions is newer.
> > Or it coudl be that part2 has just been deleted, and therefore the response
> > with 1 partition is newer.  You must have a global epoch to disambiguate
> > these two cases.
> >
> > Previously, I worked on the Ceph distributed filesystem.  Ceph had the
> > concept of a map of the whole cluster, maintained by a few servers doing
> > paxos.  This map was versioned by a single 64-bit epoch number which
> > increased on every change.  It was propagated to clients through gossip.  I
> > wonder if something similar could work here?
> >
> > It seems like the the Kafka MetadataResponse serves two somewhat unrelated
> > purposes.  Firstly, it lets clients know what partitions exist in the
> > system and where they live.  Secondly, it lets clients know which nodes
> > within the partition are in-sync (in the ISR) and which node is the leader.
> >
> > The first purpose is what you really need a metadata epoch for, I think.
> > You want to know whether a partition exists or not, or you want to know
> > which nodes you should talk to in order to write to a given partition.  A
> > single metadata epoch for the whole response should be adequate here.  We
> > should not change the partition assignment without going through zookeeper
> > (or a similar system), and this inherently serializes updates into a
> > numbered stream.  Brokers should also stop responding to requests when they
> > are unable to contact ZK for a certain time period.  This prevents the case
> > where a given partition has been moved off some set of nodes, but a client
> > still ends up talking to those nodes and writing data there.
> >
> > For the second purpose, this is "soft state" anyway.  If the client thinks
> > X is the leader but Y is really the leader, the client will talk to X, and
> > X will point out its mistake by sending back a NOT_LEADER_FOR_PARTITION.
> > Then the client can update its metadata again and find the new leader, if
> > there is one.  There is no need for an epoch to handle this.  Similarly, I
> > can't think of a reason why changing the in-sync replica set needs to bump
> > the epoch.
> >
> > best,
> > Colin
> >
> >
> > On Wed, Jan 24, 2018, at 09:45, Dong Lin wrote:
> > > Thanks much for reviewing the KIP!
> > >
> > > Dong
> > >
> > > On Wed, Jan 24, 2018 at 7:10 AM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > >
> > > > Yeah that makes sense, again I'm just making sure we understand all the
> > > > scenarios and what to expect.
> > > >
> > > > I agree that if, more generally speaking, say users have only consumed
> > to
> > > > offset 8, and then call seek(16) to "jump" to a further position, then
> > she
> > > > needs to be aware that OORE maybe thrown and she needs to handle it or
> > rely
> > > > on reset policy which should not surprise her.
> > > >
> > > >
> > > > I'm +1 on the KIP.
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Wed, Jan 24, 2018 at 12:31 AM, Dong Lin <lindon...@gmail.com>
> > wrote:
> > > >
> > > > > Yes, in general we can not prevent OffsetOutOfRangeException if user
> > > > seeks
> > > > > to a wrong offset. The main goal is to prevent
> > OffsetOutOfRangeException
> > > > if
> > > > > user has done things in the right way, e.g. user should know that
> > there
> > > > is
> > > > > message with this offset.
> > > > >
> > > > > For example, if user calls seek(..) right after construction, the
> > only
> > > > > reason I can think of is that user stores offset externally. In this
> > > > case,
> > > > > user currently needs to use the offset which is obtained using
> > > > position(..)
> > > > > from the last run. With this KIP, user needs to get the offset and
> > the
> > > > > offsetEpoch using positionAndOffsetEpoch(...) and stores these
> > > > information
> > > > > externally. The next time user starts consumer, he/she needs to call
> > > > > seek(..., offset, offsetEpoch) right after construction. Then KIP
> > should
> > > > be
> > > > > able to ensure that we don't throw OffsetOutOfRangeException if
> > there is
> > > > no
> > > > > unclean leader election.
> > > > >
> > > > > Does this sound OK?
> > > > >
> > > > > Regards,
> > > > > Dong
> > > > >
> > > > >
> > > > > On Tue, Jan 23, 2018 at 11:44 PM, Guozhang Wang <wangg...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > "If consumer wants to consume message with offset 16, then consumer
> > > > must
> > > > > > have
> > > > > > already fetched message with offset 15"
> > > > > >
> > > > > > --> this may not be always true right? What if consumer just call
> > > > > seek(16)
> > > > > > after construction and then poll without committed offset ever
> > stored
> > > > > > before? Admittedly it is rare but we do not programmably disallow
> > it.
> > > > > >
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > > On Tue, Jan 23, 2018 at 10:42 PM, Dong Lin <lindon...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > > > Hey Guozhang,
> > > > > > >
> > > > > > > Thanks much for reviewing the KIP!
> > > > > > >
> > > > > > > In the scenario you described, let's assume that broker A has
> > > > messages
> > > > > > with
> > > > > > > offset up to 10, and broker B has messages with offset up to 20.
> > If
> > > > > > > consumer wants to consume message with offset 9, it will not
> > receive
> > > > > > > OffsetOutOfRangeException
> > > > > > > from broker A.
> > > > > > >
> > > > > > > If consumer wants to consume message with offset 16, then
> > consumer
> > > > must
> > > > > > > have already fetched message with offset 15, which can only come
> > from
> > > > > > > broker B. Because consumer will fetch from broker B only if
> > > > leaderEpoch
> > > > > > >=
> > > > > > > 2, then the current consumer leaderEpoch can not be 1 since this
> > KIP
> > > > > > > prevents leaderEpoch rewind. Thus we will not have
> > > > > > > OffsetOutOfRangeException
> > > > > > > in this case.
> > > > > > >
> > > > > > > Does this address your question, or maybe there is more advanced
> > > > > scenario
> > > > > > > that the KIP does not handle?
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Dong
> > > > > > >
> > > > > > > On Tue, Jan 23, 2018 at 9:43 PM, Guozhang Wang <
> > wangg...@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Thanks Dong, I made a pass over the wiki and it lgtm.
> > > > > > > >
> > > > > > > > Just a quick question: can we completely eliminate the
> > > > > > > > OffsetOutOfRangeException with this approach? Say if there is
> > > > > > consecutive
> > > > > > > > leader changes such that the cached metadata's partition epoch
> > is
> > > > 1,
> > > > > > and
> > > > > > > > the metadata fetch response returns  with partition epoch 2
> > > > pointing
> > > > > to
> > > > > > > > leader broker A, while the actual up-to-date metadata has
> > partition
> > > > > > > epoch 3
> > > > > > > > whose leader is now broker B, the metadata refresh will still
> > > > succeed
> > > > > > and
> > > > > > > > the follow-up fetch request may still see OORE?
> > > > > > > >
> > > > > > > >
> > > > > > > > Guozhang
> > > > > > > >
> > > > > > > >
> > > > > > > > On Tue, Jan 23, 2018 at 3:47 PM, Dong Lin <lindon...@gmail.com
> > >
> > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi all,
> > > > > > > > >
> > > > > > > > > I would like to start the voting process for KIP-232:
> > > > > > > > >
> > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > > > > > 232%3A+Detect+outdated+metadata+using+leaderEpoch+
> > > > > and+partitionEpoch
> > > > > > > > >
> > > > > > > > > The KIP will help fix a concurrency issue in Kafka which
> > > > currently
> > > > > > can
> > > > > > > > > cause message loss or message duplication in consumer.
> > > > > > > > >
> > > > > > > > > Regards,
> > > > > > > > > Dong
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > -- Guozhang
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> >

Reply via email to