Hey Jun,

Certainly. We can discuss later after KIP-320 settles.

Thanks!
Dong


On Wed, Jul 11, 2018 at 8:54 AM, Jun Rao <j...@confluent.io> wrote:

> Hi, Dong,
>
> Sorry for the late response. Since KIP-320 is covering some of the similar
> problems described in this KIP, perhaps we can wait until KIP-320 settles
> and see what's still left uncovered in this KIP.
>
> Thanks,
>
> Jun
>
> On Mon, Jun 4, 2018 at 7:03 PM, Dong Lin <lindon...@gmail.com> wrote:
>
> > Hey Jun,
> >
> > It seems that we have made considerable progress on the discussion of
> > KIP-253 since February. Do you think we should continue the discussion
> > there, or can we continue the voting for this KIP? I am happy to submit
> the
> > PR and move forward the progress for this KIP.
> >
> > Thanks!
> > Dong
> >
> >
> > On Wed, Feb 7, 2018 at 11:42 PM, Dong Lin <lindon...@gmail.com> wrote:
> >
> > > Hey Jun,
> > >
> > > Sure, I will come up with a KIP this week. I think there is a way to
> > allow
> > > partition expansion to arbitrary number without introducing new
> concepts
> > > such as read-only partition or repartition epoch.
> > >
> > > Thanks,
> > > Dong
> > >
> > > On Wed, Feb 7, 2018 at 5:28 PM, Jun Rao <j...@confluent.io> wrote:
> > >
> > >> Hi, Dong,
> > >>
> > >> Thanks for the reply. The general idea that you had for adding
> > partitions
> > >> is similar to what we had in mind. It would be useful to make this
> more
> > >> general, allowing adding an arbitrary number of partitions (instead of
> > >> just
> > >> doubling) and potentially removing partitions as well. The following
> is
> > >> the
> > >> high level idea from the discussion with Colin, Jason and Ismael.
> > >>
> > >> * To change the number of partitions from X to Y in a topic, the
> > >> controller
> > >> marks all existing X partitions as read-only and creates Y new
> > partitions.
> > >> The new partitions are writable and are tagged with a higher
> repartition
> > >> epoch (RE).
> > >>
> > >> * The controller propagates the new metadata to every broker. Once the
> > >> leader of a partition is marked as read-only, it rejects the produce
> > >> requests on this partition. The producer will then refresh the
> metadata
> > >> and
> > >> start publishing to the new writable partitions.
> > >>
> > >> * The consumers will then be consuming messages in RE order. The
> > consumer
> > >> coordinator will only assign partitions in the same RE to consumers.
> > Only
> > >> after all messages in an RE are consumed, will partitions in a higher
> RE
> > >> be
> > >> assigned to consumers.
> > >>
> > >> As Colin mentioned, if we do the above, we could potentially (1) use a
> > >> globally unique partition id, or (2) use a globally unique topic id to
> > >> distinguish recreated partitions due to topic deletion.
> > >>
> > >> So, perhaps we can sketch out the re-partitioning KIP a bit more and
> see
> > >> if
> > >> there is any overlap with KIP-232. Would you be interested in doing
> > that?
> > >> If not, we can do that next week.
> > >>
> > >> Jun
> > >>
> > >>
> > >> On Tue, Feb 6, 2018 at 11:30 AM, Dong Lin <lindon...@gmail.com>
> wrote:
> > >>
> > >> > Hey Jun,
> > >> >
> > >> > Interestingly I am also planning to sketch a KIP to allow partition
> > >> > expansion for keyed topics after this KIP. Since you are already
> doing
> > >> > that, I guess I will just share my high level idea here in case it
> is
> > >> > helpful.
> > >> >
> > >> > The motivation for the KIP is that we currently lose order guarantee
> > for
> > >> > messages with the same key if we expand partitions of keyed topic.
> > >> >
> > >> > The solution can probably be built upon the following ideas:
> > >> >
> > >> > - Partition number of the keyed topic should always be doubled (or
> > >> > multiplied by power of 2). Given that we select a partition based on
> > >> > hash(key) % partitionNum, this should help us ensure that, a message
> > >> > assigned to an existing partition will not be mapped to another
> > existing
> > >> > partition after partition expansion.
> > >> >
> > >> > - Producer includes in the ProduceRequest some information that
> helps
> > >> > ensure that messages produced ti a partition will monotonically
> > >> increase in
> > >> > the partitionNum of the topic. In other words, if broker receives a
> > >> > ProduceRequest and notices that the producer does not know the
> > partition
> > >> > number has increased, broker should reject this request. That
> > >> "information"
> > >> > maybe leaderEpoch, max partitionEpoch of the partitions of the
> topic,
> > or
> > >> > simply partitionNum of the topic. The benefit of this property is
> that
> > >> we
> > >> > can keep the new logic for in-order message consumption entirely in
> > how
> > >> > consumer leader determines the partition -> consumer mapping.
> > >> >
> > >> > - When consumer leader determines partition -> consumer mapping,
> > leader
> > >> > first reads the start position for each partition using
> > >> OffsetFetchRequest.
> > >> > If start position are all non-zero, then assignment can be done in
> its
> > >> > current manner. The assumption is that, a message in the new
> partition
> > >> > should only be consumed after all messages with the same key
> produced
> > >> > before it has been consumed. Since some messages in the new
> partition
> > >> has
> > >> > been consumed, we should not worry about consuming messages
> > >> out-of-order.
> > >> > This benefit of this approach is that we can avoid unnecessary
> > overhead
> > >> in
> > >> > the common case.
> > >> >
> > >> > - If the consumer leader finds that the start position for some
> > >> partition
> > >> > is 0. Say the current partition number is 18 and the partition index
> > is
> > >> 12,
> > >> > then consumer leader should ensure that messages produced to
> partition
> > >> 12 -
> > >> > 18/2 = 3 before the first message of partition 12 is consumed,
> before
> > it
> > >> > assigned partition 12 to any consumer in the consumer group. Since
> we
> > >> have
> > >> > a "information" that is monotonically increasing per partition,
> > consumer
> > >> > can read the value of this information from the first message in
> > >> partition
> > >> > 12, get the offset corresponding to this value in partition 3,
> assign
> > >> > partition except for partition 12 (and probably other new
> partitions)
> > to
> > >> > the existing consumers, waiting for the committed offset to go
> beyond
> > >> this
> > >> > offset for partition 3, and trigger rebalance again so that
> partition
> > 3
> > >> can
> > >> > be reassigned to some consumer.
> > >> >
> > >> >
> > >> > Thanks,
> > >> > Dong
> > >> >
> > >> >
> > >> > On Tue, Feb 6, 2018 at 10:10 AM, Jun Rao <j...@confluent.io> wrote:
> > >> >
> > >> > > Hi, Dong,
> > >> > >
> > >> > > Thanks for the KIP. It looks good overall. We are working on a
> > >> separate
> > >> > KIP
> > >> > > for adding partitions while preserving the ordering guarantees.
> That
> > >> may
> > >> > > require another flavor of partition epoch. It's not very clear
> > whether
> > >> > that
> > >> > > partition epoch can be merged with the partition epoch in this
> KIP.
> > >> So,
> > >> > > perhaps you can wait on this a bit until we post the other KIP in
> > the
> > >> > next
> > >> > > few days.
> > >> > >
> > >> > > Jun
> > >> > >
> > >> > >
> > >> > >
> > >> > > On Mon, Feb 5, 2018 at 2:43 PM, Becket Qin <becket....@gmail.com>
> > >> wrote:
> > >> > >
> > >> > > > +1 on the KIP.
> > >> > > >
> > >> > > > I think the KIP is mainly about adding the capability of
> tracking
> > >> the
> > >> > > > system state change lineage. It does not seem necessary to
> bundle
> > >> this
> > >> > > KIP
> > >> > > > with replacing the topic partition with partition epoch in
> > >> > produce/fetch.
> > >> > > > Replacing topic-partition string with partition epoch is
> > >> essentially a
> > >> > > > performance improvement on top of this KIP. That can probably be
> > >> done
> > >> > > > separately.
> > >> > > >
> > >> > > > Thanks,
> > >> > > >
> > >> > > > Jiangjie (Becket) Qin
> > >> > > >
> > >> > > > On Mon, Jan 29, 2018 at 11:52 AM, Dong Lin <lindon...@gmail.com
> >
> > >> > wrote:
> > >> > > >
> > >> > > > > Hey Colin,
> > >> > > > >
> > >> > > > > On Mon, Jan 29, 2018 at 11:23 AM, Colin McCabe <
> > >> cmcc...@apache.org>
> > >> > > > wrote:
> > >> > > > >
> > >> > > > > > > On Mon, Jan 29, 2018 at 10:35 AM, Dong Lin <
> > >> lindon...@gmail.com>
> > >> > > > > wrote:
> > >> > > > > > >
> > >> > > > > > > > Hey Colin,
> > >> > > > > > > >
> > >> > > > > > > > I understand that the KIP will adds overhead by
> > introducing
> > >> > > > > > per-partition
> > >> > > > > > > > partitionEpoch. I am open to alternative solutions that
> > does
> > >> > not
> > >> > > > > incur
> > >> > > > > > > > additional overhead. But I don't see a better way now.
> > >> > > > > > > >
> > >> > > > > > > > IMO the overhead in the FetchResponse may not be that
> > much.
> > >> We
> > >> > > > > probably
> > >> > > > > > > > should discuss the percentage increase rather than the
> > >> absolute
> > >> > > > > number
> > >> > > > > > > > increase. Currently after KIP-227, per-partition header
> > has
> > >> 23
> > >> > > > bytes.
> > >> > > > > > This
> > >> > > > > > > > KIP adds another 4 bytes. Assume the records size is
> 10KB,
> > >> the
> > >> > > > > > percentage
> > >> > > > > > > > increase is 4 / (23 + 10000) = 0.03%. It seems
> negligible,
> > >> > right?
> > >> > > > > >
> > >> > > > > > Hi Dong,
> > >> > > > > >
> > >> > > > > > Thanks for the response.  I agree that the FetchRequest /
> > >> > > FetchResponse
> > >> > > > > > overhead should be OK, now that we have incremental fetch
> > >> requests
> > >> > > and
> > >> > > > > > responses.  However, there are a lot of cases where the
> > >> percentage
> > >> > > > > increase
> > >> > > > > > is much greater.  For example, if a client is doing full
> > >> > > > > MetadataRequests /
> > >> > > > > > Responses, we have some math kind of like this per
> partition:
> > >> > > > > >
> > >> > > > > > > UpdateMetadataRequestPartitionState => topic partition
> > >> > > > > controller_epoch
> > >> > > > > > leader  leader_epoch partition_epoch isr zk_version replicas
> > >> > > > > > offline_replicas
> > >> > > > > > > 14 bytes:  topic => string (assuming about 10 byte topic
> > >> names)
> > >> > > > > > > 4 bytes:  partition => int32
> > >> > > > > > > 4  bytes: conroller_epoch => int32
> > >> > > > > > > 4  bytes: leader => int32
> > >> > > > > > > 4  bytes: leader_epoch => int32
> > >> > > > > > > +4 EXTRA bytes: partition_epoch => int32        <-- NEW
> > >> > > > > > > 2+4+4+4 bytes: isr => [int32] (assuming 3 in the ISR)
> > >> > > > > > > 4 bytes: zk_version => int32
> > >> > > > > > > 2+4+4+4 bytes: replicas => [int32] (assuming 3 replicas)
> > >> > > > > > > 2  offline_replicas => [int32] (assuming no offline
> > replicas)
> > >> > > > > >
> > >> > > > > > Assuming I added that up correctly, the per-partition
> overhead
> > >> goes
> > >> > > > from
> > >> > > > > > 64 bytes per partition to 68, a 6.2% increase.
> > >> > > > > >
> > >> > > > > > We could do similar math for a lot of the other RPCs.  And
> you
> > >> will
> > >> > > > have
> > >> > > > > a
> > >> > > > > > similar memory and garbage collection impact on the brokers
> > >> since
> > >> > you
> > >> > > > > have
> > >> > > > > > to store all this extra state as well.
> > >> > > > > >
> > >> > > > >
> > >> > > > > That is correct. IMO the Metadata is only updated periodically
> > >> and is
> > >> > > > > probably not a big deal if we increase it by 6%. The
> > FetchResponse
> > >> > and
> > >> > > > > ProduceRequest are probably the only requests that are bounded
> > by
> > >> the
> > >> > > > > bandwidth throughput.
> > >> > > > >
> > >> > > > >
> > >> > > > > >
> > >> > > > > > > >
> > >> > > > > > > > I agree that we can probably save more space by using
> > >> partition
> > >> > > ID
> > >> > > > so
> > >> > > > > > that
> > >> > > > > > > > we no longer needs the string topic name. The similar
> idea
> > >> has
> > >> > > also
> > >> > > > > > been
> > >> > > > > > > > put in the Rejected Alternative section in KIP-227.
> While
> > >> this
> > >> > > idea
> > >> > > > > is
> > >> > > > > > > > promising, it seems orthogonal to the goal of this KIP.
> > >> Given
> > >> > > that
> > >> > > > > > there is
> > >> > > > > > > > already many work to do in this KIP, maybe we can do the
> > >> > > partition
> > >> > > > ID
> > >> > > > > > in a
> > >> > > > > > > > separate KIP?
> > >> > > > > >
> > >> > > > > > I guess my thinking is that the goal here is to replace an
> > >> > identifier
> > >> > > > > > which can be re-used (the tuple of topic name, partition ID)
> > >> with
> > >> > an
> > >> > > > > > identifier that cannot be re-used (the tuple of topic name,
> > >> > partition
> > >> > > > ID,
> > >> > > > > > partition epoch) in order to gain better semantics.  As long
> > as
> > >> we
> > >> > > are
> > >> > > > > > replacing the identifier, why not replace it with an
> > identifier
> > >> > that
> > >> > > > has
> > >> > > > > > important performance advantages?  The KIP freeze for the
> next
> > >> > > release
> > >> > > > > has
> > >> > > > > > already passed, so there is time to do this.
> > >> > > > > >
> > >> > > > >
> > >> > > > > In general it can be easier for discussion and implementation
> if
> > >> we
> > >> > can
> > >> > > > > split a larger task into smaller and independent tasks. For
> > >> example,
> > >> > > > > KIP-112 and KIP-113 both deals with the JBOD support. KIP-31,
> > >> KIP-32
> > >> > > and
> > >> > > > > KIP-33 are about timestamp support. The option on this can be
> > >> subject
> > >> > > > > though.
> > >> > > > >
> > >> > > > > IMO the change to switch from (topic, partition ID) to
> > >> partitionEpch
> > >> > in
> > >> > > > all
> > >> > > > > request/response requires us to going through all request one
> by
> > >> one.
> > >> > > It
> > >> > > > > may not be hard but it can be time consuming and tedious. At
> > high
> > >> > level
> > >> > > > the
> > >> > > > > goal and the change for that will be orthogonal to the changes
> > >> > required
> > >> > > > in
> > >> > > > > this KIP. That is the main reason I think we can split them
> into
> > >> two
> > >> > > > KIPs.
> > >> > > > >
> > >> > > > >
> > >> > > > > > On Mon, Jan 29, 2018, at 10:54, Dong Lin wrote:
> > >> > > > > > > I think it is possible to move to entirely use
> > partitionEpoch
> > >> > > instead
> > >> > > > > of
> > >> > > > > > > (topic, partition) to identify a partition. Client can
> > obtain
> > >> the
> > >> > > > > > > partitionEpoch -> (topic, partition) mapping from
> > >> > MetadataResponse.
> > >> > > > We
> > >> > > > > > > probably need to figure out a way to assign partitionEpoch
> > to
> > >> > > > existing
> > >> > > > > > > partitions in the cluster. But this should be doable.
> > >> > > > > > >
> > >> > > > > > > This is a good idea. I think it will save us some space in
> > the
> > >> > > > > > > request/response. The actual space saving in percentage
> > >> probably
> > >> > > > > depends
> > >> > > > > > on
> > >> > > > > > > the amount of data and the number of partitions of the
> same
> > >> > topic.
> > >> > > I
> > >> > > > > just
> > >> > > > > > > think we can do it in a separate KIP.
> > >> > > > > >
> > >> > > > > > Hmm.  How much extra work would be required?  It seems like
> we
> > >> are
> > >> > > > > already
> > >> > > > > > changing almost every RPC that involves topics and
> partitions,
> > >> > > already
> > >> > > > > > adding new per-partition state to ZooKeeper, already
> changing
> > >> how
> > >> > > > clients
> > >> > > > > > interact with partitions.  Is there some other big piece of
> > work
> > >> > we'd
> > >> > > > > have
> > >> > > > > > to do to move to partition IDs that we wouldn't need for
> > >> partition
> > >> > > > > epochs?
> > >> > > > > > I guess we'd have to find a way to support regular
> > >> expression-based
> > >> > > > topic
> > >> > > > > > subscriptions.  If we split this into multiple KIPs,
> wouldn't
> > we
> > >> > end
> > >> > > up
> > >> > > > > > changing all that RPCs and ZK state a second time?  Also,
> I'm
> > >> > curious
> > >> > > > if
> > >> > > > > > anyone has done any proof of concept GC, memory, and network
> > >> usage
> > >> > > > > > measurements on switching topic names for topic IDs.
> > >> > > > > >
> > >> > > > >
> > >> > > > >
> > >> > > > > We will need to go over all requests/responses to check how to
> > >> > replace
> > >> > > > > (topic, partition ID) with partition epoch. It requires
> > >> non-trivial
> > >> > > work
> > >> > > > > and could take time. As you mentioned, we may want to see how
> > much
> > >> > > saving
> > >> > > > > we can get by switching from topic names to partition epoch.
> > That
> > >> > > itself
> > >> > > > > requires time and experiment. It seems that the new idea does
> > not
> > >> > > > rollback
> > >> > > > > any change proposed in this KIP. So I am not sure we can get
> > much
> > >> by
> > >> > > > > putting them into the same KIP.
> > >> > > > >
> > >> > > > > Anyway, if more people are interested in seeing the new idea
> in
> > >> the
> > >> > > same
> > >> > > > > KIP, I can try that.
> > >> > > > >
> > >> > > > >
> > >> > > > >
> > >> > > > > >
> > >> > > > > > best,
> > >> > > > > > Colin
> > >> > > > > >
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > > > >
> > >> > > > > > > >
> > >> > > > > > > >
> > >> > > > > > > >
> > >> > > > > > > >
> > >> > > > > > > >
> > >> > > > > > > >
> > >> > > > > > > >
> > >> > > > > > > >
> > >> > > > > > > >
> > >> > > > > > > > On Mon, Jan 29, 2018 at 10:18 AM, Colin McCabe <
> > >> > > cmcc...@apache.org
> > >> > > > >
> > >> > > > > > wrote:
> > >> > > > > > > >
> > >> > > > > > > >> On Fri, Jan 26, 2018, at 12:17, Dong Lin wrote:
> > >> > > > > > > >> > Hey Colin,
> > >> > > > > > > >> >
> > >> > > > > > > >> >
> > >> > > > > > > >> > On Fri, Jan 26, 2018 at 10:16 AM, Colin McCabe <
> > >> > > > > cmcc...@apache.org>
> > >> > > > > > > >> wrote:
> > >> > > > > > > >> >
> > >> > > > > > > >> > > On Thu, Jan 25, 2018, at 16:47, Dong Lin wrote:
> > >> > > > > > > >> > > > Hey Colin,
> > >> > > > > > > >> > > >
> > >> > > > > > > >> > > > Thanks for the comment.
> > >> > > > > > > >> > > >
> > >> > > > > > > >> > > > On Thu, Jan 25, 2018 at 4:15 PM, Colin McCabe <
> > >> > > > > > cmcc...@apache.org>
> > >> > > > > > > >> > > wrote:
> > >> > > > > > > >> > > >
> > >> > > > > > > >> > > > > On Wed, Jan 24, 2018, at 21:07, Dong Lin wrote:
> > >> > > > > > > >> > > > > > Hey Colin,
> > >> > > > > > > >> > > > > >
> > >> > > > > > > >> > > > > > Thanks for reviewing the KIP.
> > >> > > > > > > >> > > > > >
> > >> > > > > > > >> > > > > > If I understand you right, you maybe
> suggesting
> > >> that
> > >> > > we
> > >> > > > > can
> > >> > > > > > use
> > >> > > > > > > >> a
> > >> > > > > > > >> > > global
> > >> > > > > > > >> > > > > > metadataEpoch that is incremented every time
> > >> > > controller
> > >> > > > > > updates
> > >> > > > > > > >> > > metadata.
> > >> > > > > > > >> > > > > > The problem with this solution is that, if a
> > >> topic
> > >> > is
> > >> > > > > > deleted
> > >> > > > > > > >> and
> > >> > > > > > > >> > > created
> > >> > > > > > > >> > > > > > again, user will not know whether that the
> > offset
> > >> > > which
> > >> > > > is
> > >> > > > > > > >> stored
> > >> > > > > > > >> > > before
> > >> > > > > > > >> > > > > > the topic deletion is no longer valid. This
> > >> > motivates
> > >> > > > the
> > >> > > > > > idea
> > >> > > > > > > >> to
> > >> > > > > > > >> > > include
> > >> > > > > > > >> > > > > > per-partition partitionEpoch. Does this sound
> > >> > > > reasonable?
> > >> > > > > > > >> > > > >
> > >> > > > > > > >> > > > > Hi Dong,
> > >> > > > > > > >> > > > >
> > >> > > > > > > >> > > > > Perhaps we can store the last valid offset of
> > each
> > >> > > deleted
> > >> > > > > > topic
> > >> > > > > > > >> in
> > >> > > > > > > >> > > > > ZooKeeper.  Then, when a topic with one of
> those
> > >> names
> > >> > > > gets
> > >> > > > > > > >> > > re-created, we
> > >> > > > > > > >> > > > > can start the topic at the previous end offset
> > >> rather
> > >> > > than
> > >> > > > > at
> > >> > > > > > 0.
> > >> > > > > > > >> This
> > >> > > > > > > >> > > > > preserves immutability.  It is no more
> burdensome
> > >> than
> > >> > > > > having
> > >> > > > > > to
> > >> > > > > > > >> > > preserve a
> > >> > > > > > > >> > > > > "last epoch" for the deleted partition
> somewhere,
> > >> > right?
> > >> > > > > > > >> > > > >
> > >> > > > > > > >> > > >
> > >> > > > > > > >> > > > My concern with this solution is that the number
> of
> > >> > > > zookeeper
> > >> > > > > > nodes
> > >> > > > > > > >> get
> > >> > > > > > > >> > > > more and more over time if some users keep
> deleting
> > >> and
> > >> > > > > creating
> > >> > > > > > > >> topics.
> > >> > > > > > > >> > > Do
> > >> > > > > > > >> > > > you think this can be a problem?
> > >> > > > > > > >> > >
> > >> > > > > > > >> > > Hi Dong,
> > >> > > > > > > >> > >
> > >> > > > > > > >> > > We could expire the "partition tombstones" after an
> > >> hour
> > >> > or
> > >> > > > so.
> > >> > > > > > In
> > >> > > > > > > >> > > practice this would solve the issue for clients
> that
> > >> like
> > >> > to
> > >> > > > > > destroy
> > >> > > > > > > >> and
> > >> > > > > > > >> > > re-create topics all the time.  In any case,
> doesn't
> > >> the
> > >> > > > current
> > >> > > > > > > >> proposal
> > >> > > > > > > >> > > add per-partition znodes as well that we have to
> > track
> > >> > even
> > >> > > > > after
> > >> > > > > > the
> > >> > > > > > > >> > > partition is deleted?  Or did I misunderstand that?
> > >> > > > > > > >> > >
> > >> > > > > > > >> >
> > >> > > > > > > >> > Actually the current KIP does not add per-partition
> > >> znodes.
> > >> > > > Could
> > >> > > > > > you
> > >> > > > > > > >> > double check? I can fix the KIP wiki if there is
> > anything
> > >> > > > > > misleading.
> > >> > > > > > > >>
> > >> > > > > > > >> Hi Dong,
> > >> > > > > > > >>
> > >> > > > > > > >> I double-checked the KIP, and I can see that you are in
> > >> fact
> > >> > > > using a
> > >> > > > > > > >> global counter for initializing partition epochs.  So,
> > you
> > >> are
> > >> > > > > > correct, it
> > >> > > > > > > >> doesn't add per-partition znodes for partitions that no
> > >> longer
> > >> > > > > exist.
> > >> > > > > > > >>
> > >> > > > > > > >> >
> > >> > > > > > > >> > If we expire the "partition tomstones" after an hour,
> > and
> > >> > the
> > >> > > > > topic
> > >> > > > > > is
> > >> > > > > > > >> > re-created after more than an hour since the topic
> > >> deletion,
> > >> > > > then
> > >> > > > > > we are
> > >> > > > > > > >> > back to the situation where user can not tell whether
> > the
> > >> > > topic
> > >> > > > > has
> > >> > > > > > been
> > >> > > > > > > >> > re-created or not, right?
> > >> > > > > > > >>
> > >> > > > > > > >> Yes, with an expiration period, it would not ensure
> > >> > > immutability--
> > >> > > > > you
> > >> > > > > > > >> could effectively reuse partition names and they would
> > look
> > >> > the
> > >> > > > > same.
> > >> > > > > > > >>
> > >> > > > > > > >> >
> > >> > > > > > > >> >
> > >> > > > > > > >> > >
> > >> > > > > > > >> > > It's not really clear to me what should happen
> when a
> > >> > topic
> > >> > > is
> > >> > > > > > > >> destroyed
> > >> > > > > > > >> > > and re-created with new data.  Should consumers
> > >> continue
> > >> > to
> > >> > > be
> > >> > > > > > able to
> > >> > > > > > > >> > > consume?  We don't know where they stopped
> consuming
> > >> from
> > >> > > the
> > >> > > > > > previous
> > >> > > > > > > >> > > incarnation of the topic, so messages may have been
> > >> lost.
> > >> > > > > > Certainly
> > >> > > > > > > >> > > consuming data from offset X of the new incarnation
> > of
> > >> the
> > >> > > > topic
> > >> > > > > > may
> > >> > > > > > > >> give
> > >> > > > > > > >> > > something totally different from what you would
> have
> > >> > gotten
> > >> > > > from
> > >> > > > > > > >> offset X
> > >> > > > > > > >> > > of the previous incarnation of the topic.
> > >> > > > > > > >> > >
> > >> > > > > > > >> >
> > >> > > > > > > >> > With the current KIP, if a consumer consumes a topic
> > >> based
> > >> > on
> > >> > > > the
> > >> > > > > > last
> > >> > > > > > > >> > remembered (offset, partitionEpoch, leaderEpoch), and
> > if
> > >> the
> > >> > > > topic
> > >> > > > > > is
> > >> > > > > > > >> > re-created, consume will throw
> > >> > InvalidPartitionEpochException
> > >> > > > > > because
> > >> > > > > > > >> the
> > >> > > > > > > >> > previous partitionEpoch will be different from the
> > >> current
> > >> > > > > > > >> partitionEpoch.
> > >> > > > > > > >> > This is described in the Proposed Changes ->
> > Consumption
> > >> > after
> > >> > > > > topic
> > >> > > > > > > >> > deletion in the KIP. I can improve the KIP if there
> is
> > >> > > anything
> > >> > > > > not
> > >> > > > > > > >> clear.
> > >> > > > > > > >>
> > >> > > > > > > >> Thanks for the clarification.  It sounds like what you
> > >> really
> > >> > > want
> > >> > > > > is
> > >> > > > > > > >> immutability-- i.e., to never "really" reuse partition
> > >> > > > identifiers.
> > >> > > > > > And
> > >> > > > > > > >> you do this by making the partition name no longer the
> > >> "real"
> > >> > > > > > identifier.
> > >> > > > > > > >>
> > >> > > > > > > >> My big concern about this KIP is that it seems like an
> > >> > > > > > anti-scalability
> > >> > > > > > > >> feature.  Now we are adding 4 extra bytes for every
> > >> partition
> > >> > in
> > >> > > > the
> > >> > > > > > > >> FetchResponse and Request, for example.  That could be
> 40
> > >> kb
> > >> > per
> > >> > > > > > request,
> > >> > > > > > > >> if the user has 10,000 partitions.  And of course, the
> > KIP
> > >> > also
> > >> > > > > makes
> > >> > > > > > > >> massive changes to UpdateMetadataRequest,
> > MetadataResponse,
> > >> > > > > > > >> OffsetCommitRequest, OffsetFetchResponse,
> > >> LeaderAndIsrRequest,
> > >> > > > > > > >> ListOffsetResponse, etc. which will also increase their
> > >> size
> > >> > on
> > >> > > > the
> > >> > > > > > wire
> > >> > > > > > > >> and in memory.
> > >> > > > > > > >>
> > >> > > > > > > >> One thing that we talked a lot about in the past is
> > >> replacing
> > >> > > > > > partition
> > >> > > > > > > >> names with IDs.  IDs have a lot of really nice
> features.
> > >> They
> > >> > > > take
> > >> > > > > > up much
> > >> > > > > > > >> less space in memory than strings (especially 2-byte
> Java
> > >> > > > strings).
> > >> > > > > > They
> > >> > > > > > > >> can often be allocated on the stack rather than the
> heap
> > >> > > > (important
> > >> > > > > > when
> > >> > > > > > > >> you are dealing with hundreds of thousands of them).
> > They
> > >> can
> > >> > > be
> > >> > > > > > > >> efficiently deserialized and serialized.  If we use
> > 64-bit
> > >> > ones,
> > >> > > > we
> > >> > > > > > will
> > >> > > > > > > >> never run out of IDs, which means that they can always
> be
> > >> > unique
> > >> > > > per
> > >> > > > > > > >> partition.
> > >> > > > > > > >>
> > >> > > > > > > >> Given that the partition name is no longer the "real"
> > >> > identifier
> > >> > > > for
> > >> > > > > > > >> partitions in the current KIP-232 proposal, why not
> just
> > >> move
> > >> > to
> > >> > > > > using
> > >> > > > > > > >> partition IDs entirely instead of strings?  You have to
> > >> change
> > >> > > all
> > >> > > > > the
> > >> > > > > > > >> messages anyway.  There isn't much point any more to
> > >> carrying
> > >> > > > around
> > >> > > > > > the
> > >> > > > > > > >> partition name in every RPC, since you really need
> (name,
> > >> > epoch)
> > >> > > > to
> > >> > > > > > > >> identify the partition.
> > >> > > > > > > >> Probably the metadata response and a few other messages
> > >> would
> > >> > > have
> > >> > > > > to
> > >> > > > > > > >> still carry the partition name, to allow clients to go
> > from
> > >> > name
> > >> > > > to
> > >> > > > > > id.
> > >> > > > > > > >> But we could mostly forget about the strings.  And then
> > >> this
> > >> > > would
> > >> > > > > be
> > >> > > > > > a
> > >> > > > > > > >> scalability improvement rather than a scalability
> > problem.
> > >> > > > > > > >>
> > >> > > > > > > >> >
> > >> > > > > > > >> >
> > >> > > > > > > >> > > By choosing to reuse the same (topic, partition,
> > >> offset)
> > >> > > > > 3-tuple,
> > >> > > > > > we
> > >> > > > > > > >> have
> > >> > > > > > > >> >
> > >> > > > > > > >> > chosen to give up immutability.  That was a really
> bad
> > >> > > decision.
> > >> > > > > > And
> > >> > > > > > > >> now
> > >> > > > > > > >> > > we have to worry about time dependencies, stale
> > cached
> > >> > data,
> > >> > > > and
> > >> > > > > > all
> > >> > > > > > > >> the
> > >> > > > > > > >> > > rest.  We can't completely fix this inside Kafka no
> > >> matter
> > >> > > > what
> > >> > > > > > we do,
> > >> > > > > > > >> > > because not all that cached data is inside Kafka
> > >> itself.
> > >> > > Some
> > >> > > > > of
> > >> > > > > > it
> > >> > > > > > > >> may be
> > >> > > > > > > >> > > in systems that Kafka has sent data to, such as
> other
> > >> > > daemons,
> > >> > > > > SQL
> > >> > > > > > > >> > > databases, streams, and so forth.
> > >> > > > > > > >> > >
> > >> > > > > > > >> >
> > >> > > > > > > >> > The current KIP will uniquely identify a message
> using
> > >> > (topic,
> > >> > > > > > > >> partition,
> > >> > > > > > > >> > offset, partitionEpoch) 4-tuple. This addresses the
> > >> message
> > >> > > > > > immutability
> > >> > > > > > > >> > issue that you mentioned. Is there any corner case
> > where
> > >> the
> > >> > > > > message
> > >> > > > > > > >> > immutability is still not preserved with the current
> > KIP?
> > >> > > > > > > >> >
> > >> > > > > > > >> >
> > >> > > > > > > >> > >
> > >> > > > > > > >> > > I guess the idea here is that mirror maker should
> > work
> > >> as
> > >> > > > > expected
> > >> > > > > > > >> when
> > >> > > > > > > >> > > users destroy a topic and re-create it with the
> same
> > >> name.
> > >> > > > > That's
> > >> > > > > > > >> kind of
> > >> > > > > > > >> > > tough, though, since in that scenario, mirror maker
> > >> > probably
> > >> > > > > > should
> > >> > > > > > > >> destroy
> > >> > > > > > > >> > > and re-create the topic on the other end, too,
> right?
> > >> > > > > Otherwise,
> > >> > > > > > > >> what you
> > >> > > > > > > >> > > end up with on the other end could be half of one
> > >> > > incarnation
> > >> > > > of
> > >> > > > > > the
> > >> > > > > > > >> topic,
> > >> > > > > > > >> > > and half of another.
> > >> > > > > > > >> > >
> > >> > > > > > > >> > > What mirror maker really needs is to be able to
> > follow
> > >> a
> > >> > > > stream
> > >> > > > > of
> > >> > > > > > > >> events
> > >> > > > > > > >> > > about the kafka cluster itself.  We could have some
> > >> master
> > >> > > > topic
> > >> > > > > > > >> which is
> > >> > > > > > > >> > > always present and which contains data about all
> > topic
> > >> > > > > deletions,
> > >> > > > > > > >> > > creations, etc.  Then MM can simply follow this
> topic
> > >> and
> > >> > do
> > >> > > > > what
> > >> > > > > > is
> > >> > > > > > > >> needed.
> > >> > > > > > > >> > >
> > >> > > > > > > >> > > >
> > >> > > > > > > >> > > >
> > >> > > > > > > >> > > > >
> > >> > > > > > > >> > > > > >
> > >> > > > > > > >> > > > > > Then the next question maybe, should we use a
> > >> global
> > >> > > > > > > >> metadataEpoch +
> > >> > > > > > > >> > > > > > per-partition partitionEpoch, instead of
> using
> > >> > > > > per-partition
> > >> > > > > > > >> > > leaderEpoch
> > >> > > > > > > >> > > > > +
> > >> > > > > > > >> > > > > > per-partition leaderEpoch. The former
> solution
> > >> using
> > >> > > > > > > >> metadataEpoch
> > >> > > > > > > >> > > would
> > >> > > > > > > >> > > > > > not work due to the following scenario
> > (provided
> > >> by
> > >> > > > Jun):
> > >> > > > > > > >> > > > > >
> > >> > > > > > > >> > > > > > "Consider the following scenario. In metadata
> > v1,
> > >> > the
> > >> > > > > leader
> > >> > > > > > > >> for a
> > >> > > > > > > >> > > > > > partition is at broker 1. In metadata v2,
> > leader
> > >> is
> > >> > at
> > >> > > > > > broker
> > >> > > > > > > >> 2. In
> > >> > > > > > > >> > > > > > metadata v3, leader is at broker 1 again. The
> > >> last
> > >> > > > > committed
> > >> > > > > > > >> offset
> > >> > > > > > > >> > > in
> > >> > > > > > > >> > > > > v1,
> > >> > > > > > > >> > > > > > v2 and v3 are 10, 20 and 30, respectively. A
> > >> > consumer
> > >> > > is
> > >> > > > > > > >> started and
> > >> > > > > > > >> > > > > reads
> > >> > > > > > > >> > > > > > metadata v1 and reads messages from offset 0
> to
> > >> 25
> > >> > > from
> > >> > > > > > broker
> > >> > > > > > > >> 1. My
> > >> > > > > > > >> > > > > > understanding is that in the current
> proposal,
> > >> the
> > >> > > > > metadata
> > >> > > > > > > >> version
> > >> > > > > > > >> > > > > > associated with offset 25 is v1. The consumer
> > is
> > >> > then
> > >> > > > > > restarted
> > >> > > > > > > >> and
> > >> > > > > > > >> > > > > fetches
> > >> > > > > > > >> > > > > > metadata v2. The consumer tries to read from
> > >> broker
> > >> > 2,
> > >> > > > > > which is
> > >> > > > > > > >> the
> > >> > > > > > > >> > > old
> > >> > > > > > > >> > > > > > leader with the last offset at 20. In this
> > case,
> > >> the
> > >> > > > > > consumer
> > >> > > > > > > >> will
> > >> > > > > > > >> > > still
> > >> > > > > > > >> > > > > > get OffsetOutOfRangeException incorrectly."
> > >> > > > > > > >> > > > > >
> > >> > > > > > > >> > > > > > Regarding your comment "For the second
> purpose,
> > >> this
> > >> > > is
> > >> > > > > > "soft
> > >> > > > > > > >> state"
> > >> > > > > > > >> > > > > > anyway.  If the client thinks X is the leader
> > >> but Y
> > >> > is
> > >> > > > > > really
> > >> > > > > > > >> the
> > >> > > > > > > >> > > leader,
> > >> > > > > > > >> > > > > > the client will talk to X, and X will point
> out
> > >> its
> > >> > > > > mistake
> > >> > > > > > by
> > >> > > > > > > >> > > sending
> > >> > > > > > > >> > > > > back
> > >> > > > > > > >> > > > > > a NOT_LEADER_FOR_PARTITION.", it is probably
> no
> > >> > true.
> > >> > > > The
> > >> > > > > > > >> problem
> > >> > > > > > > >> > > here is
> > >> > > > > > > >> > > > > > that the old leader X may still think it is
> the
> > >> > leader
> > >> > > > of
> > >> > > > > > the
> > >> > > > > > > >> > > partition
> > >> > > > > > > >> > > > > and
> > >> > > > > > > >> > > > > > thus it will not send back
> > >> NOT_LEADER_FOR_PARTITION.
> > >> > > The
> > >> > > > > > reason
> > >> > > > > > > >> is
> > >> > > > > > > >> > > > > provided
> > >> > > > > > > >> > > > > > in KAFKA-6262. Can you check if that makes
> > sense?
> > >> > > > > > > >> > > > >
> > >> > > > > > > >> > > > > This is solvable with a timeout, right?  If the
> > >> leader
> > >> > > > can't
> > >> > > > > > > >> > > communicate
> > >> > > > > > > >> > > > > with the controller for a certain period of
> time,
> > >> it
> > >> > > > should
> > >> > > > > > stop
> > >> > > > > > > >> > > acting as
> > >> > > > > > > >> > > > > the leader.  We have to solve this problem,
> > >> anyway, in
> > >> > > > order
> > >> > > > > > to
> > >> > > > > > > >> fix
> > >> > > > > > > >> > > all the
> > >> > > > > > > >> > > > > corner cases.
> > >> > > > > > > >> > > > >
> > >> > > > > > > >> > > >
> > >> > > > > > > >> > > > Not sure if I fully understand your proposal. The
> > >> > proposal
> > >> > > > > > seems to
> > >> > > > > > > >> > > require
> > >> > > > > > > >> > > > non-trivial changes to our existing leadership
> > >> election
> > >> > > > > > mechanism.
> > >> > > > > > > >> Could
> > >> > > > > > > >> > > > you provide more detail regarding how it works?
> For
> > >> > > example,
> > >> > > > > how
> > >> > > > > > > >> should
> > >> > > > > > > >> > > > user choose this timeout, how leader determines
> > >> whether
> > >> > it
> > >> > > > can
> > >> > > > > > still
> > >> > > > > > > >> > > > communicate with controller, and how this
> triggers
> > >> > > > controller
> > >> > > > > to
> > >> > > > > > > >> elect
> > >> > > > > > > >> > > new
> > >> > > > > > > >> > > > leader?
> > >> > > > > > > >> > >
> > >> > > > > > > >> > > Before I come up with any proposal, let me make
> sure
> > I
> > >> > > > > understand
> > >> > > > > > the
> > >> > > > > > > >> > > problem correctly.  My big question was, what
> > prevents
> > >> > > > > split-brain
> > >> > > > > > > >> here?
> > >> > > > > > > >> > >
> > >> > > > > > > >> > > Let's say I have a partition which is on nodes A,
> B,
> > >> and
> > >> > C,
> > >> > > > with
> > >> > > > > > > >> min-ISR
> > >> > > > > > > >> > > 2.  The controller is D.  At some point, there is a
> > >> > network
> > >> > > > > > partition
> > >> > > > > > > >> > > between A and B and the rest of the cluster.  The
> > >> > Controller
> > >> > > > > > > >> re-assigns the
> > >> > > > > > > >> > > partition to nodes C, D, and E.  But A and B keep
> > >> chugging
> > >> > > > away,
> > >> > > > > > even
> > >> > > > > > > >> > > though they can no longer communicate with the
> > >> controller.
> > >> > > > > > > >> > >
> > >> > > > > > > >> > > At some point, a client with stale metadata writes
> to
> > >> the
> > >> > > > > > partition.
> > >> > > > > > > >> It
> > >> > > > > > > >> > > still thinks the partition is on node A, B, and C,
> so
> > >> > that's
> > >> > > > > > where it
> > >> > > > > > > >> sends
> > >> > > > > > > >> > > the data.  It's unable to talk to C, but A and B
> > reply
> > >> > back
> > >> > > > that
> > >> > > > > > all
> > >> > > > > > > >> is
> > >> > > > > > > >> > > well.
> > >> > > > > > > >> > >
> > >> > > > > > > >> > > Is this not a case where we could lose data due to
> > >> split
> > >> > > > brain?
> > >> > > > > > Or is
> > >> > > > > > > >> > > there a mechanism for preventing this that I
> missed?
> > >> If
> > >> > it
> > >> > > > is,
> > >> > > > > it
> > >> > > > > > > >> seems
> > >> > > > > > > >> > > like a pretty serious failure case that we should
> be
> > >> > > handling
> > >> > > > > > with our
> > >> > > > > > > >> > > metadata rework.  And I think epoch numbers and
> > >> timeouts
> > >> > > might
> > >> > > > > be
> > >> > > > > > > >> part of
> > >> > > > > > > >> > > the solution.
> > >> > > > > > > >> > >
> > >> > > > > > > >> >
> > >> > > > > > > >> > Right, split brain can happen if RF=4 and minIsr=2.
> > >> > However, I
> > >> > > > am
> > >> > > > > > not
> > >> > > > > > > >> sure
> > >> > > > > > > >> > it is a pretty serious issue which we need to address
> > >> today.
> > >> > > > This
> > >> > > > > > can be
> > >> > > > > > > >> > prevented by configuring the Kafka topic so that
> > minIsr >
> > >> > > RF/2.
> > >> > > > > > > >> Actually,
> > >> > > > > > > >> > if user sets minIsr=2, is there anything reason that
> > user
> > >> > > wants
> > >> > > > to
> > >> > > > > > set
> > >> > > > > > > >> RF=4
> > >> > > > > > > >> > instead of 4?
> > >> > > > > > > >> >
> > >> > > > > > > >> > Introducing timeout in leader election mechanism is
> > >> > > > non-trivial. I
> > >> > > > > > > >> think we
> > >> > > > > > > >> > probably want to do that only if there is good
> use-case
> > >> that
> > >> > > can
> > >> > > > > not
> > >> > > > > > > >> > otherwise be addressed with the current mechanism.
> > >> > > > > > > >>
> > >> > > > > > > >> I still would like to think about these corner cases
> > more.
> > >> > But
> > >> > > > > > perhaps
> > >> > > > > > > >> it's not directly related to this KIP.
> > >> > > > > > > >>
> > >> > > > > > > >> regards,
> > >> > > > > > > >> Colin
> > >> > > > > > > >>
> > >> > > > > > > >>
> > >> > > > > > > >> >
> > >> > > > > > > >> >
> > >> > > > > > > >> > > best,
> > >> > > > > > > >> > > Colin
> > >> > > > > > > >> > >
> > >> > > > > > > >> > >
> > >> > > > > > > >> > > >
> > >> > > > > > > >> > > >
> > >> > > > > > > >> > > > > best,
> > >> > > > > > > >> > > > > Colin
> > >> > > > > > > >> > > > >
> > >> > > > > > > >> > > > > >
> > >> > > > > > > >> > > > > > Regards,
> > >> > > > > > > >> > > > > > Dong
> > >> > > > > > > >> > > > > >
> > >> > > > > > > >> > > > > >
> > >> > > > > > > >> > > > > > On Wed, Jan 24, 2018 at 10:39 AM, Colin
> McCabe
> > <
> > >> > > > > > > >> cmcc...@apache.org>
> > >> > > > > > > >> > > > > wrote:
> > >> > > > > > > >> > > > > >
> > >> > > > > > > >> > > > > > > Hi Dong,
> > >> > > > > > > >> > > > > > >
> > >> > > > > > > >> > > > > > > Thanks for proposing this KIP.  I think a
> > >> metadata
> > >> > > > epoch
> > >> > > > > > is a
> > >> > > > > > > >> > > really
> > >> > > > > > > >> > > > > good
> > >> > > > > > > >> > > > > > > idea.
> > >> > > > > > > >> > > > > > >
> > >> > > > > > > >> > > > > > > I read through the DISCUSS thread, but I
> > still
> > >> > don't
> > >> > > > > have
> > >> > > > > > a
> > >> > > > > > > >> clear
> > >> > > > > > > >> > > > > picture
> > >> > > > > > > >> > > > > > > of why the proposal uses a metadata epoch
> per
> > >> > > > partition
> > >> > > > > > rather
> > >> > > > > > > >> > > than a
> > >> > > > > > > >> > > > > > > global metadata epoch.  A metadata epoch
> per
> > >> > > partition
> > >> > > > > is
> > >> > > > > > > >> kind of
> > >> > > > > > > >> > > > > > > unpleasant-- it's at least 4 extra bytes
> per
> > >> > > partition
> > >> > > > > > that we
> > >> > > > > > > >> > > have to
> > >> > > > > > > >> > > > > send
> > >> > > > > > > >> > > > > > > over the wire in every full metadata
> request,
> > >> > which
> > >> > > > > could
> > >> > > > > > > >> become
> > >> > > > > > > >> > > extra
> > >> > > > > > > >> > > > > > > kilobytes on the wire when the number of
> > >> > partitions
> > >> > > > > > becomes
> > >> > > > > > > >> large.
> > >> > > > > > > >> > > > > Plus,
> > >> > > > > > > >> > > > > > > we have to update all the auxillary classes
> > to
> > >> > > include
> > >> > > > > an
> > >> > > > > > > >> epoch.
> > >> > > > > > > >> > > > > > >
> > >> > > > > > > >> > > > > > > We need to have a global metadata epoch
> > anyway
> > >> to
> > >> > > > handle
> > >> > > > > > > >> partition
> > >> > > > > > > >> > > > > > > addition and deletion.  For example, if I
> > give
> > >> you
> > >> > > > > > > >> > > > > > > MetadataResponse{part1,epoch 1, part2,
> epoch
> > 1}
> > >> > and
> > >> > > > > > {part1,
> > >> > > > > > > >> > > epoch1},
> > >> > > > > > > >> > > > > which
> > >> > > > > > > >> > > > > > > MetadataResponse is newer?  You have no way
> > of
> > >> > > > knowing.
> > >> > > > > > It
> > >> > > > > > > >> could
> > >> > > > > > > >> > > be
> > >> > > > > > > >> > > > > that
> > >> > > > > > > >> > > > > > > part2 has just been created, and the
> response
> > >> > with 2
> > >> > > > > > > >> partitions is
> > >> > > > > > > >> > > > > newer.
> > >> > > > > > > >> > > > > > > Or it coudl be that part2 has just been
> > >> deleted,
> > >> > and
> > >> > > > > > > >> therefore the
> > >> > > > > > > >> > > > > response
> > >> > > > > > > >> > > > > > > with 1 partition is newer.  You must have a
> > >> global
> > >> > > > epoch
> > >> > > > > > to
> > >> > > > > > > >> > > > > disambiguate
> > >> > > > > > > >> > > > > > > these two cases.
> > >> > > > > > > >> > > > > > >
> > >> > > > > > > >> > > > > > > Previously, I worked on the Ceph
> distributed
> > >> > > > filesystem.
> > >> > > > > > > >> Ceph had
> > >> > > > > > > >> > > the
> > >> > > > > > > >> > > > > > > concept of a map of the whole cluster,
> > >> maintained
> > >> > > by a
> > >> > > > > few
> > >> > > > > > > >> servers
> > >> > > > > > > >> > > > > doing
> > >> > > > > > > >> > > > > > > paxos.  This map was versioned by a single
> > >> 64-bit
> > >> > > > epoch
> > >> > > > > > number
> > >> > > > > > > >> > > which
> > >> > > > > > > >> > > > > > > increased on every change.  It was
> propagated
> > >> to
> > >> > > > clients
> > >> > > > > > > >> through
> > >> > > > > > > >> > > > > gossip.  I
> > >> > > > > > > >> > > > > > > wonder if something similar could work
> here?
> > >> > > > > > > >> > > > > > >
> > >> > > > > > > >> > > > > > > It seems like the the Kafka
> MetadataResponse
> > >> > serves
> > >> > > > two
> > >> > > > > > > >> somewhat
> > >> > > > > > > >> > > > > unrelated
> > >> > > > > > > >> > > > > > > purposes.  Firstly, it lets clients know
> what
> > >> > > > partitions
> > >> > > > > > > >> exist in
> > >> > > > > > > >> > > the
> > >> > > > > > > >> > > > > > > system and where they live.  Secondly, it
> > lets
> > >> > > clients
> > >> > > > > > know
> > >> > > > > > > >> which
> > >> > > > > > > >> > > nodes
> > >> > > > > > > >> > > > > > > within the partition are in-sync (in the
> ISR)
> > >> and
> > >> > > > which
> > >> > > > > > node
> > >> > > > > > > >> is the
> > >> > > > > > > >> > > > > leader.
> > >> > > > > > > >> > > > > > >
> > >> > > > > > > >> > > > > > > The first purpose is what you really need a
> > >> > metadata
> > >> > > > > epoch
> > >> > > > > > > >> for, I
> > >> > > > > > > >> > > > > think.
> > >> > > > > > > >> > > > > > > You want to know whether a partition exists
> > or
> > >> > not,
> > >> > > or
> > >> > > > > you
> > >> > > > > > > >> want to
> > >> > > > > > > >> > > know
> > >> > > > > > > >> > > > > > > which nodes you should talk to in order to
> > >> write
> > >> > to
> > >> > > a
> > >> > > > > > given
> > >> > > > > > > >> > > > > partition.  A
> > >> > > > > > > >> > > > > > > single metadata epoch for the whole
> response
> > >> > should
> > >> > > be
> > >> > > > > > > >> adequate
> > >> > > > > > > >> > > here.
> > >> > > > > > > >> > > > > We
> > >> > > > > > > >> > > > > > > should not change the partition assignment
> > >> without
> > >> > > > going
> > >> > > > > > > >> through
> > >> > > > > > > >> > > > > zookeeper
> > >> > > > > > > >> > > > > > > (or a similar system), and this inherently
> > >> > > serializes
> > >> > > > > > updates
> > >> > > > > > > >> into
> > >> > > > > > > >> > > a
> > >> > > > > > > >> > > > > > > numbered stream.  Brokers should also stop
> > >> > > responding
> > >> > > > to
> > >> > > > > > > >> requests
> > >> > > > > > > >> > > when
> > >> > > > > > > >> > > > > they
> > >> > > > > > > >> > > > > > > are unable to contact ZK for a certain time
> > >> > period.
> > >> > > > > This
> > >> > > > > > > >> prevents
> > >> > > > > > > >> > > the
> > >> > > > > > > >> > > > > case
> > >> > > > > > > >> > > > > > > where a given partition has been moved off
> > some
> > >> > set
> > >> > > of
> > >> > > > > > nodes,
> > >> > > > > > > >> but a
> > >> > > > > > > >> > > > > client
> > >> > > > > > > >> > > > > > > still ends up talking to those nodes and
> > >> writing
> > >> > > data
> > >> > > > > > there.
> > >> > > > > > > >> > > > > > >
> > >> > > > > > > >> > > > > > > For the second purpose, this is "soft
> state"
> > >> > anyway.
> > >> > > > If
> > >> > > > > > the
> > >> > > > > > > >> client
> > >> > > > > > > >> > > > > thinks
> > >> > > > > > > >> > > > > > > X is the leader but Y is really the leader,
> > the
> > >> > > client
> > >> > > > > > will
> > >> > > > > > > >> talk
> > >> > > > > > > >> > > to X,
> > >> > > > > > > >> > > > > and
> > >> > > > > > > >> > > > > > > X will point out its mistake by sending
> back
> > a
> > >> > > > > > > >> > > > > NOT_LEADER_FOR_PARTITION.
> > >> > > > > > > >> > > > > > > Then the client can update its metadata
> again
> > >> and
> > >> > > find
> > >> > > > > > the new
> > >> > > > > > > >> > > leader,
> > >> > > > > > > >> > > > > if
> > >> > > > > > > >> > > > > > > there is one.  There is no need for an
> epoch
> > to
> > >> > > handle
> > >> > > > > > this.
> > >> > > > > > > >> > > > > Similarly, I
> > >> > > > > > > >> > > > > > > can't think of a reason why changing the
> > >> in-sync
> > >> > > > replica
> > >> > > > > > set
> > >> > > > > > > >> needs
> > >> > > > > > > >> > > to
> > >> > > > > > > >> > > > > bump
> > >> > > > > > > >> > > > > > > the epoch.
> > >> > > > > > > >> > > > > > >
> > >> > > > > > > >> > > > > > > best,
> > >> > > > > > > >> > > > > > > Colin
> > >> > > > > > > >> > > > > > >
> > >> > > > > > > >> > > > > > >
> > >> > > > > > > >> > > > > > > On Wed, Jan 24, 2018, at 09:45, Dong Lin
> > wrote:
> > >> > > > > > > >> > > > > > > > Thanks much for reviewing the KIP!
> > >> > > > > > > >> > > > > > > >
> > >> > > > > > > >> > > > > > > > Dong
> > >> > > > > > > >> > > > > > > >
> > >> > > > > > > >> > > > > > > > On Wed, Jan 24, 2018 at 7:10 AM, Guozhang
> > >> Wang <
> > >> > > > > > > >> > > wangg...@gmail.com>
> > >> > > > > > > >> > > > > > > wrote:
> > >> > > > > > > >> > > > > > > >
> > >> > > > > > > >> > > > > > > > > Yeah that makes sense, again I'm just
> > >> making
> > >> > > sure
> > >> > > > we
> > >> > > > > > > >> understand
> > >> > > > > > > >> > > > > all the
> > >> > > > > > > >> > > > > > > > > scenarios and what to expect.
> > >> > > > > > > >> > > > > > > > >
> > >> > > > > > > >> > > > > > > > > I agree that if, more generally
> speaking,
> > >> say
> > >> > > > users
> > >> > > > > > have
> > >> > > > > > > >> only
> > >> > > > > > > >> > > > > consumed
> > >> > > > > > > >> > > > > > > to
> > >> > > > > > > >> > > > > > > > > offset 8, and then call seek(16) to
> > "jump"
> > >> to
> > >> > a
> > >> > > > > > further
> > >> > > > > > > >> > > position,
> > >> > > > > > > >> > > > > then
> > >> > > > > > > >> > > > > > > she
> > >> > > > > > > >> > > > > > > > > needs to be aware that OORE maybe
> thrown
> > >> and
> > >> > she
> > >> > > > > > needs to
> > >> > > > > > > >> > > handle
> > >> > > > > > > >> > > > > it or
> > >> > > > > > > >> > > > > > > rely
> > >> > > > > > > >> > > > > > > > > on reset policy which should not
> surprise
> > >> her.
> > >> > > > > > > >> > > > > > > > >
> > >> > > > > > > >> > > > > > > > >
> > >> > > > > > > >> > > > > > > > > I'm +1 on the KIP.
> > >> > > > > > > >> > > > > > > > >
> > >> > > > > > > >> > > > > > > > > Guozhang
> > >> > > > > > > >> > > > > > > > >
> > >> > > > > > > >> > > > > > > > >
> > >> > > > > > > >> > > > > > > > > On Wed, Jan 24, 2018 at 12:31 AM, Dong
> > Lin
> > >> <
> > >> > > > > > > >> > > lindon...@gmail.com>
> > >> > > > > > > >> > > > > > > wrote:
> > >> > > > > > > >> > > > > > > > >
> > >> > > > > > > >> > > > > > > > > > Yes, in general we can not prevent
> > >> > > > > > > >> OffsetOutOfRangeException
> > >> > > > > > > >> > > if
> > >> > > > > > > >> > > > > user
> > >> > > > > > > >> > > > > > > > > seeks
> > >> > > > > > > >> > > > > > > > > > to a wrong offset. The main goal is
> to
> > >> > prevent
> > >> > > > > > > >> > > > > > > OffsetOutOfRangeException
> > >> > > > > > > >> > > > > > > > > if
> > >> > > > > > > >> > > > > > > > > > user has done things in the right
> way,
> > >> e.g.
> > >> > > user
> > >> > > > > > should
> > >> > > > > > > >> know
> > >> > > > > > > >> > > that
> > >> > > > > > > >> > > > > > > there
> > >> > > > > > > >> > > > > > > > > is
> > >> > > > > > > >> > > > > > > > > > message with this offset.
> > >> > > > > > > >> > > > > > > > > >
> > >> > > > > > > >> > > > > > > > > > For example, if user calls seek(..)
> > right
> > >> > > after
> > >> > > > > > > >> > > construction, the
> > >> > > > > > > >> > > > > > > only
> > >> > > > > > > >> > > > > > > > > > reason I can think of is that user
> > stores
> > >> > > offset
> > >> > > > > > > >> externally.
> > >> > > > > > > >> > > In
> > >> > > > > > > >> > > > > this
> > >> > > > > > > >> > > > > > > > > case,
> > >> > > > > > > >> > > > > > > > > > user currently needs to use the
> offset
> > >> which
> > >> > > is
> > >> > > > > > obtained
> > >> > > > > > > >> > > using
> > >> > > > > > > >> > > > > > > > > position(..)
> > >> > > > > > > >> > > > > > > > > > from the last run. With this KIP,
> user
> > >> needs
> > >> > > to
> > >> > > > > get
> > >> > > > > > the
> > >> > > > > > > >> > > offset
> > >> > > > > > > >> > > > > and
> > >> > > > > > > >> > > > > > > the
> > >> > > > > > > >> > > > > > > > > > offsetEpoch using
> > >> > positionAndOffsetEpoch(...)
> > >> > > > and
> > >> > > > > > stores
> > >> > > > > > > >> > > these
> > >> > > > > > > >> > > > > > > > > information
> > >> > > > > > > >> > > > > > > > > > externally. The next time user starts
> > >> > > consumer,
> > >> > > > > > he/she
> > >> > > > > > > >> needs
> > >> > > > > > > >> > > to
> > >> > > > > > > >> > > > > call
> > >> > > > > > > >> > > > > > > > > > seek(..., offset, offsetEpoch) right
> > >> after
> > >> > > > > > construction.
> > >> > > > > > > >> > > Then KIP
> > >> > > > > > > >> > > > > > > should
> > >> > > > > > > >> > > > > > > > > be
> > >> > > > > > > >> > > > > > > > > > able to ensure that we don't throw
> > >> > > > > > > >> OffsetOutOfRangeException
> > >> > > > > > > >> > > if
> > >> > > > > > > >> > > > > > > there is
> > >> > > > > > > >> > > > > > > > > no
> > >> > > > > > > >> > > > > > > > > > unclean leader election.
> > >> > > > > > > >> > > > > > > > > >
> > >> > > > > > > >> > > > > > > > > > Does this sound OK?
> > >> > > > > > > >> > > > > > > > > >
> > >> > > > > > > >> > > > > > > > > > Regards,
> > >> > > > > > > >> > > > > > > > > > Dong
> > >> > > > > > > >> > > > > > > > > >
> > >> > > > > > > >> > > > > > > > > >
> > >> > > > > > > >> > > > > > > > > > On Tue, Jan 23, 2018 at 11:44 PM,
> > >> Guozhang
> > >> > > Wang
> > >> > > > <
> > >> > > > > > > >> > > > > wangg...@gmail.com>
> > >> > > > > > > >> > > > > > > > > > wrote:
> > >> > > > > > > >> > > > > > > > > >
> > >> > > > > > > >> > > > > > > > > > > "If consumer wants to consume
> message
> > >> with
> > >> > > > > offset
> > >> > > > > > 16,
> > >> > > > > > > >> then
> > >> > > > > > > >> > > > > consumer
> > >> > > > > > > >> > > > > > > > > must
> > >> > > > > > > >> > > > > > > > > > > have
> > >> > > > > > > >> > > > > > > > > > > already fetched message with offset
> > 15"
> > >> > > > > > > >> > > > > > > > > > >
> > >> > > > > > > >> > > > > > > > > > > --> this may not be always true
> > right?
> > >> > What
> > >> > > if
> > >> > > > > > > >> consumer
> > >> > > > > > > >> > > just
> > >> > > > > > > >> > > > > call
> > >> > > > > > > >> > > > > > > > > > seek(16)
> > >> > > > > > > >> > > > > > > > > > > after construction and then poll
> > >> without
> > >> > > > > committed
> > >> > > > > > > >> offset
> > >> > > > > > > >> > > ever
> > >> > > > > > > >> > > > > > > stored
> > >> > > > > > > >> > > > > > > > > > > before? Admittedly it is rare but
> we
> > do
> > >> > not
> > >> > > > > > > >> programmably
> > >> > > > > > > >> > > > > disallow
> > >> > > > > > > >> > > > > > > it.
> > >> > > > > > > >> > > > > > > > > > >
> > >> > > > > > > >> > > > > > > > > > >
> > >> > > > > > > >> > > > > > > > > > > Guozhang
> > >> > > > > > > >> > > > > > > > > > >
> > >> > > > > > > >> > > > > > > > > > > On Tue, Jan 23, 2018 at 10:42 PM,
> > Dong
> > >> > Lin <
> > >> > > > > > > >> > > > > lindon...@gmail.com>
> > >> > > > > > > >> > > > > > > > > wrote:
> > >> > > > > > > >> > > > > > > > > > >
> > >> > > > > > > >> > > > > > > > > > > > Hey Guozhang,
> > >> > > > > > > >> > > > > > > > > > > >
> > >> > > > > > > >> > > > > > > > > > > > Thanks much for reviewing the
> KIP!
> > >> > > > > > > >> > > > > > > > > > > >
> > >> > > > > > > >> > > > > > > > > > > > In the scenario you described,
> > let's
> > >> > > assume
> > >> > > > > that
> > >> > > > > > > >> broker
> > >> > > > > > > >> > > A has
> > >> > > > > > > >> > > > > > > > > messages
> > >> > > > > > > >> > > > > > > > > > > with
> > >> > > > > > > >> > > > > > > > > > > > offset up to 10, and broker B has
> > >> > messages
> > >> > > > > with
> > >> > > > > > > >> offset
> > >> > > > > > > >> > > up to
> > >> > > > > > > >> > > > > 20.
> > >> > > > > > > >> > > > > > > If
> > >> > > > > > > >> > > > > > > > > > > > consumer wants to consume message
> > >> with
> > >> > > > offset
> > >> > > > > > 9, it
> > >> > > > > > > >> will
> > >> > > > > > > >> > > not
> > >> > > > > > > >> > > > > > > receive
> > >> > > > > > > >> > > > > > > > > > > > OffsetOutOfRangeException
> > >> > > > > > > >> > > > > > > > > > > > from broker A.
> > >> > > > > > > >> > > > > > > > > > > >
> > >> > > > > > > >> > > > > > > > > > > > If consumer wants to consume
> > message
> > >> > with
> > >> > > > > offset
> > >> > > > > > > >> 16, then
> > >> > > > > > > >> > > > > > > consumer
> > >> > > > > > > >> > > > > > > > > must
> > >> > > > > > > >> > > > > > > > > > > > have already fetched message with
> > >> offset
> > >> > > 15,
> > >> > > > > > which
> > >> > > > > > > >> can
> > >> > > > > > > >> > > only
> > >> > > > > > > >> > > > > come
> > >> > > > > > > >> > > > > > > from
> > >> > > > > > > >> > > > > > > > > > > > broker B. Because consumer will
> > fetch
> > >> > from
> > >> > > > > > broker B
> > >> > > > > > > >> only
> > >> > > > > > > >> > > if
> > >> > > > > > > >> > > > > > > > > leaderEpoch
> > >> > > > > > > >> > > > > > > > > > > >=
> > >> > > > > > > >> > > > > > > > > > > > 2, then the current consumer
> > >> leaderEpoch
> > >> > > can
> > >> > > > > > not be
> > >> > > > > > > >> 1
> > >> > > > > > > >> > > since
> > >> > > > > > > >> > > > > this
> > >> > > > > > > >> > > > > > > KIP
> > >> > > > > > > >> > > > > > > > > > > > prevents leaderEpoch rewind. Thus
> > we
> > >> > will
> > >> > > > not
> > >> > > > > > have
> > >> > > > > > > >> > > > > > > > > > > > OffsetOutOfRangeException
> > >> > > > > > > >> > > > > > > > > > > > in this case.
> > >> > > > > > > >> > > > > > > > > > > >
> > >> > > > > > > >> > > > > > > > > > > > Does this address your question,
> or
> > >> > maybe
> > >> > > > > there
> > >> > > > > > is
> > >> > > > > > > >> more
> > >> > > > > > > >> > > > > advanced
> > >> > > > > > > >> > > > > > > > > > scenario
> > >> > > > > > > >> > > > > > > > > > > > that the KIP does not handle?
> > >> > > > > > > >> > > > > > > > > > > >
> > >> > > > > > > >> > > > > > > > > > > > Thanks,
> > >> > > > > > > >> > > > > > > > > > > > Dong
> > >> > > > > > > >> > > > > > > > > > > >
> > >> > > > > > > >> > > > > > > > > > > > On Tue, Jan 23, 2018 at 9:43 PM,
> > >> > Guozhang
> > >> > > > > Wang <
> > >> > > > > > > >> > > > > > > wangg...@gmail.com>
> > >> > > > > > > >> > > > > > > > > > > wrote:
> > >> > > > > > > >> > > > > > > > > > > >
> > >> > > > > > > >> > > > > > > > > > > > > Thanks Dong, I made a pass over
> > the
> > >> > wiki
> > >> > > > and
> > >> > > > > > it
> > >> > > > > > > >> lgtm.
> > >> > > > > > > >> > > > > > > > > > > > >
> > >> > > > > > > >> > > > > > > > > > > > > Just a quick question: can we
> > >> > completely
> > >> > > > > > > >> eliminate the
> > >> > > > > > > >> > > > > > > > > > > > > OffsetOutOfRangeException with
> > this
> > >> > > > > approach?
> > >> > > > > > Say
> > >> > > > > > > >> if
> > >> > > > > > > >> > > there
> > >> > > > > > > >> > > > > is
> > >> > > > > > > >> > > > > > > > > > > consecutive
> > >> > > > > > > >> > > > > > > > > > > > > leader changes such that the
> > cached
> > >> > > > > metadata's
> > >> > > > > > > >> > > partition
> > >> > > > > > > >> > > > > epoch
> > >> > > > > > > >> > > > > > > is
> > >> > > > > > > >> > > > > > > > > 1,
> > >> > > > > > > >> > > > > > > > > > > and
> > >> > > > > > > >> > > > > > > > > > > > > the metadata fetch response
> > returns
> > >> > > with
> > >> > > > > > > >> partition
> > >> > > > > > > >> > > epoch 2
> > >> > > > > > > >> > > > > > > > > pointing
> > >> > > > > > > >> > > > > > > > > > to
> > >> > > > > > > >> > > > > > > > > > > > > leader broker A, while the
> actual
> > >> > > > up-to-date
> > >> > > > > > > >> metadata
> > >> > > > > > > >> > > has
> > >> > > > > > > >> > > > > > > partition
> > >> > > > > > > >> > > > > > > > > > > > epoch 3
> > >> > > > > > > >> > > > > > > > > > > > > whose leader is now broker B,
> the
> > >> > > metadata
> > >> > > > > > > >> refresh will
> > >> > > > > > > >> > > > > still
> > >> > > > > > > >> > > > > > > > > succeed
> > >> > > > > > > >> > > > > > > > > > > and
> > >> > > > > > > >> > > > > > > > > > > > > the follow-up fetch request may
> > >> still
> > >> > > see
> > >> > > > > > OORE?
> > >> > > > > > > >> > > > > > > > > > > > >
> > >> > > > > > > >> > > > > > > > > > > > >
> > >> > > > > > > >> > > > > > > > > > > > > Guozhang
> > >> > > > > > > >> > > > > > > > > > > > >
> > >> > > > > > > >> > > > > > > > > > > > >
> > >> > > > > > > >> > > > > > > > > > > > > On Tue, Jan 23, 2018 at 3:47
> PM,
> > >> Dong
> > >> > > Lin
> > >> > > > <
> > >> > > > > > > >> > > > > lindon...@gmail.com
> > >> > > > > > > >> > > > > > > >
> > >> > > > > > > >> > > > > > > > > > wrote:
> > >> > > > > > > >> > > > > > > > > > > > >
> > >> > > > > > > >> > > > > > > > > > > > > > Hi all,
> > >> > > > > > > >> > > > > > > > > > > > > >
> > >> > > > > > > >> > > > > > > > > > > > > > I would like to start the
> > voting
> > >> > > process
> > >> > > > > for
> > >> > > > > > > >> KIP-232:
> > >> > > > > > > >> > > > > > > > > > > > > >
> > >> > > > > > > >> > > > > > > > > > > > > > https://cwiki.apache.org/
> > >> > > > > > > >> > > confluence/display/KAFKA/KIP-
> > >> > > > > > > >> > > > > > > > > > > > > >
> 232%3A+Detect+outdated+metadat
> > >> > > > > > > >> a+using+leaderEpoch+
> > >> > > > > > > >> > > > > > > > > > and+partitionEpoch
> > >> > > > > > > >> > > > > > > > > > > > > >
> > >> > > > > > > >> > > > > > > > > > > > > > The KIP will help fix a
> > >> concurrency
> > >> > > > issue
> > >> > > > > in
> > >> > > > > > > >> Kafka
> > >> > > > > > > >> > > which
> > >> > > > > > > >> > > > > > > > > currently
> > >> > > > > > > >> > > > > > > > > > > can
> > >> > > > > > > >> > > > > > > > > > > > > > cause message loss or message
> > >> > > > duplication
> > >> > > > > in
> > >> > > > > > > >> > > consumer.
> > >> > > > > > > >> > > > > > > > > > > > > >
> > >> > > > > > > >> > > > > > > > > > > > > > Regards,
> > >> > > > > > > >> > > > > > > > > > > > > > Dong
> > >> > > > > > > >> > > > > > > > > > > > > >
> > >> > > > > > > >> > > > > > > > > > > > >
> > >> > > > > > > >> > > > > > > > > > > > >
> > >> > > > > > > >> > > > > > > > > > > > >
> > >> > > > > > > >> > > > > > > > > > > > > --
> > >> > > > > > > >> > > > > > > > > > > > > -- Guozhang
> > >> > > > > > > >> > > > > > > > > > > > >
> > >> > > > > > > >> > > > > > > > > > > >
> > >> > > > > > > >> > > > > > > > > > >
> > >> > > > > > > >> > > > > > > > > > >
> > >> > > > > > > >> > > > > > > > > > >
> > >> > > > > > > >> > > > > > > > > > > --
> > >> > > > > > > >> > > > > > > > > > > -- Guozhang
> > >> > > > > > > >> > > > > > > > > > >
> > >> > > > > > > >> > > > > > > > > >
> > >> > > > > > > >> > > > > > > > >
> > >> > > > > > > >> > > > > > > > >
> > >> > > > > > > >> > > > > > > > >
> > >> > > > > > > >> > > > > > > > > --
> > >> > > > > > > >> > > > > > > > > -- Guozhang
> > >> > > > > > > >> > > > > > > > >
> > >> > > > > > > >> > > > > > >
> > >> > > > > > > >> > > > >
> > >> > > > > > > >> > >
> > >> > > > > > > >>
> > >> > > > > > > >
> > >> > > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> > >
> >
>

Reply via email to