Hey Matthias,

Thanks for checking back  on the status. The KIP has been marked as
replaced by KIP-320 in the KIP list wiki page and the status has been
updated in the discussion and voting email thread.

Thanks,
Dong

On Sun, 30 Sep 2018 at 11:51 AM Matthias J. Sax <matth...@confluent.io>
wrote:

> It seems that KIP-320 was accepted. Thus, I am wondering what the status
> of this KIP is?
>
> -Matthias
>
> On 7/11/18 10:59 AM, Dong Lin wrote:
> > Hey Jun,
> >
> > Certainly. We can discuss later after KIP-320 settles.
> >
> > Thanks!
> > Dong
> >
> >
> > On Wed, Jul 11, 2018 at 8:54 AM, Jun Rao <j...@confluent.io> wrote:
> >
> >> Hi, Dong,
> >>
> >> Sorry for the late response. Since KIP-320 is covering some of the
> similar
> >> problems described in this KIP, perhaps we can wait until KIP-320
> settles
> >> and see what's still left uncovered in this KIP.
> >>
> >> Thanks,
> >>
> >> Jun
> >>
> >> On Mon, Jun 4, 2018 at 7:03 PM, Dong Lin <lindon...@gmail.com> wrote:
> >>
> >>> Hey Jun,
> >>>
> >>> It seems that we have made considerable progress on the discussion of
> >>> KIP-253 since February. Do you think we should continue the discussion
> >>> there, or can we continue the voting for this KIP? I am happy to submit
> >> the
> >>> PR and move forward the progress for this KIP.
> >>>
> >>> Thanks!
> >>> Dong
> >>>
> >>>
> >>> On Wed, Feb 7, 2018 at 11:42 PM, Dong Lin <lindon...@gmail.com> wrote:
> >>>
> >>>> Hey Jun,
> >>>>
> >>>> Sure, I will come up with a KIP this week. I think there is a way to
> >>> allow
> >>>> partition expansion to arbitrary number without introducing new
> >> concepts
> >>>> such as read-only partition or repartition epoch.
> >>>>
> >>>> Thanks,
> >>>> Dong
> >>>>
> >>>> On Wed, Feb 7, 2018 at 5:28 PM, Jun Rao <j...@confluent.io> wrote:
> >>>>
> >>>>> Hi, Dong,
> >>>>>
> >>>>> Thanks for the reply. The general idea that you had for adding
> >>> partitions
> >>>>> is similar to what we had in mind. It would be useful to make this
> >> more
> >>>>> general, allowing adding an arbitrary number of partitions (instead
> of
> >>>>> just
> >>>>> doubling) and potentially removing partitions as well. The following
> >> is
> >>>>> the
> >>>>> high level idea from the discussion with Colin, Jason and Ismael.
> >>>>>
> >>>>> * To change the number of partitions from X to Y in a topic, the
> >>>>> controller
> >>>>> marks all existing X partitions as read-only and creates Y new
> >>> partitions.
> >>>>> The new partitions are writable and are tagged with a higher
> >> repartition
> >>>>> epoch (RE).
> >>>>>
> >>>>> * The controller propagates the new metadata to every broker. Once
> the
> >>>>> leader of a partition is marked as read-only, it rejects the produce
> >>>>> requests on this partition. The producer will then refresh the
> >> metadata
> >>>>> and
> >>>>> start publishing to the new writable partitions.
> >>>>>
> >>>>> * The consumers will then be consuming messages in RE order. The
> >>> consumer
> >>>>> coordinator will only assign partitions in the same RE to consumers.
> >>> Only
> >>>>> after all messages in an RE are consumed, will partitions in a higher
> >> RE
> >>>>> be
> >>>>> assigned to consumers.
> >>>>>
> >>>>> As Colin mentioned, if we do the above, we could potentially (1) use
> a
> >>>>> globally unique partition id, or (2) use a globally unique topic id
> to
> >>>>> distinguish recreated partitions due to topic deletion.
> >>>>>
> >>>>> So, perhaps we can sketch out the re-partitioning KIP a bit more and
> >> see
> >>>>> if
> >>>>> there is any overlap with KIP-232. Would you be interested in doing
> >>> that?
> >>>>> If not, we can do that next week.
> >>>>>
> >>>>> Jun
> >>>>>
> >>>>>
> >>>>> On Tue, Feb 6, 2018 at 11:30 AM, Dong Lin <lindon...@gmail.com>
> >> wrote:
> >>>>>
> >>>>>> Hey Jun,
> >>>>>>
> >>>>>> Interestingly I am also planning to sketch a KIP to allow partition
> >>>>>> expansion for keyed topics after this KIP. Since you are already
> >> doing
> >>>>>> that, I guess I will just share my high level idea here in case it
> >> is
> >>>>>> helpful.
> >>>>>>
> >>>>>> The motivation for the KIP is that we currently lose order guarantee
> >>> for
> >>>>>> messages with the same key if we expand partitions of keyed topic.
> >>>>>>
> >>>>>> The solution can probably be built upon the following ideas:
> >>>>>>
> >>>>>> - Partition number of the keyed topic should always be doubled (or
> >>>>>> multiplied by power of 2). Given that we select a partition based on
> >>>>>> hash(key) % partitionNum, this should help us ensure that, a message
> >>>>>> assigned to an existing partition will not be mapped to another
> >>> existing
> >>>>>> partition after partition expansion.
> >>>>>>
> >>>>>> - Producer includes in the ProduceRequest some information that
> >> helps
> >>>>>> ensure that messages produced ti a partition will monotonically
> >>>>> increase in
> >>>>>> the partitionNum of the topic. In other words, if broker receives a
> >>>>>> ProduceRequest and notices that the producer does not know the
> >>> partition
> >>>>>> number has increased, broker should reject this request. That
> >>>>> "information"
> >>>>>> maybe leaderEpoch, max partitionEpoch of the partitions of the
> >> topic,
> >>> or
> >>>>>> simply partitionNum of the topic. The benefit of this property is
> >> that
> >>>>> we
> >>>>>> can keep the new logic for in-order message consumption entirely in
> >>> how
> >>>>>> consumer leader determines the partition -> consumer mapping.
> >>>>>>
> >>>>>> - When consumer leader determines partition -> consumer mapping,
> >>> leader
> >>>>>> first reads the start position for each partition using
> >>>>> OffsetFetchRequest.
> >>>>>> If start position are all non-zero, then assignment can be done in
> >> its
> >>>>>> current manner. The assumption is that, a message in the new
> >> partition
> >>>>>> should only be consumed after all messages with the same key
> >> produced
> >>>>>> before it has been consumed. Since some messages in the new
> >> partition
> >>>>> has
> >>>>>> been consumed, we should not worry about consuming messages
> >>>>> out-of-order.
> >>>>>> This benefit of this approach is that we can avoid unnecessary
> >>> overhead
> >>>>> in
> >>>>>> the common case.
> >>>>>>
> >>>>>> - If the consumer leader finds that the start position for some
> >>>>> partition
> >>>>>> is 0. Say the current partition number is 18 and the partition index
> >>> is
> >>>>> 12,
> >>>>>> then consumer leader should ensure that messages produced to
> >> partition
> >>>>> 12 -
> >>>>>> 18/2 = 3 before the first message of partition 12 is consumed,
> >> before
> >>> it
> >>>>>> assigned partition 12 to any consumer in the consumer group. Since
> >> we
> >>>>> have
> >>>>>> a "information" that is monotonically increasing per partition,
> >>> consumer
> >>>>>> can read the value of this information from the first message in
> >>>>> partition
> >>>>>> 12, get the offset corresponding to this value in partition 3,
> >> assign
> >>>>>> partition except for partition 12 (and probably other new
> >> partitions)
> >>> to
> >>>>>> the existing consumers, waiting for the committed offset to go
> >> beyond
> >>>>> this
> >>>>>> offset for partition 3, and trigger rebalance again so that
> >> partition
> >>> 3
> >>>>> can
> >>>>>> be reassigned to some consumer.
> >>>>>>
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Dong
> >>>>>>
> >>>>>>
> >>>>>> On Tue, Feb 6, 2018 at 10:10 AM, Jun Rao <j...@confluent.io> wrote:
> >>>>>>
> >>>>>>> Hi, Dong,
> >>>>>>>
> >>>>>>> Thanks for the KIP. It looks good overall. We are working on a
> >>>>> separate
> >>>>>> KIP
> >>>>>>> for adding partitions while preserving the ordering guarantees.
> >> That
> >>>>> may
> >>>>>>> require another flavor of partition epoch. It's not very clear
> >>> whether
> >>>>>> that
> >>>>>>> partition epoch can be merged with the partition epoch in this
> >> KIP.
> >>>>> So,
> >>>>>>> perhaps you can wait on this a bit until we post the other KIP in
> >>> the
> >>>>>> next
> >>>>>>> few days.
> >>>>>>>
> >>>>>>> Jun
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On Mon, Feb 5, 2018 at 2:43 PM, Becket Qin <becket....@gmail.com>
> >>>>> wrote:
> >>>>>>>
> >>>>>>>> +1 on the KIP.
> >>>>>>>>
> >>>>>>>> I think the KIP is mainly about adding the capability of
> >> tracking
> >>>>> the
> >>>>>>>> system state change lineage. It does not seem necessary to
> >> bundle
> >>>>> this
> >>>>>>> KIP
> >>>>>>>> with replacing the topic partition with partition epoch in
> >>>>>> produce/fetch.
> >>>>>>>> Replacing topic-partition string with partition epoch is
> >>>>> essentially a
> >>>>>>>> performance improvement on top of this KIP. That can probably be
> >>>>> done
> >>>>>>>> separately.
> >>>>>>>>
> >>>>>>>> Thanks,
> >>>>>>>>
> >>>>>>>> Jiangjie (Becket) Qin
> >>>>>>>>
> >>>>>>>> On Mon, Jan 29, 2018 at 11:52 AM, Dong Lin <lindon...@gmail.com
> >>>
> >>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hey Colin,
> >>>>>>>>>
> >>>>>>>>> On Mon, Jan 29, 2018 at 11:23 AM, Colin McCabe <
> >>>>> cmcc...@apache.org>
> >>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>>> On Mon, Jan 29, 2018 at 10:35 AM, Dong Lin <
> >>>>> lindon...@gmail.com>
> >>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hey Colin,
> >>>>>>>>>>>>
> >>>>>>>>>>>> I understand that the KIP will adds overhead by
> >>> introducing
> >>>>>>>>>> per-partition
> >>>>>>>>>>>> partitionEpoch. I am open to alternative solutions that
> >>> does
> >>>>>> not
> >>>>>>>>> incur
> >>>>>>>>>>>> additional overhead. But I don't see a better way now.
> >>>>>>>>>>>>
> >>>>>>>>>>>> IMO the overhead in the FetchResponse may not be that
> >>> much.
> >>>>> We
> >>>>>>>>> probably
> >>>>>>>>>>>> should discuss the percentage increase rather than the
> >>>>> absolute
> >>>>>>>>> number
> >>>>>>>>>>>> increase. Currently after KIP-227, per-partition header
> >>> has
> >>>>> 23
> >>>>>>>> bytes.
> >>>>>>>>>> This
> >>>>>>>>>>>> KIP adds another 4 bytes. Assume the records size is
> >> 10KB,
> >>>>> the
> >>>>>>>>>> percentage
> >>>>>>>>>>>> increase is 4 / (23 + 10000) = 0.03%. It seems
> >> negligible,
> >>>>>> right?
> >>>>>>>>>>
> >>>>>>>>>> Hi Dong,
> >>>>>>>>>>
> >>>>>>>>>> Thanks for the response.  I agree that the FetchRequest /
> >>>>>>> FetchResponse
> >>>>>>>>>> overhead should be OK, now that we have incremental fetch
> >>>>> requests
> >>>>>>> and
> >>>>>>>>>> responses.  However, there are a lot of cases where the
> >>>>> percentage
> >>>>>>>>> increase
> >>>>>>>>>> is much greater.  For example, if a client is doing full
> >>>>>>>>> MetadataRequests /
> >>>>>>>>>> Responses, we have some math kind of like this per
> >> partition:
> >>>>>>>>>>
> >>>>>>>>>>> UpdateMetadataRequestPartitionState => topic partition
> >>>>>>>>> controller_epoch
> >>>>>>>>>> leader  leader_epoch partition_epoch isr zk_version replicas
> >>>>>>>>>> offline_replicas
> >>>>>>>>>>> 14 bytes:  topic => string (assuming about 10 byte topic
> >>>>> names)
> >>>>>>>>>>> 4 bytes:  partition => int32
> >>>>>>>>>>> 4  bytes: conroller_epoch => int32
> >>>>>>>>>>> 4  bytes: leader => int32
> >>>>>>>>>>> 4  bytes: leader_epoch => int32
> >>>>>>>>>>> +4 EXTRA bytes: partition_epoch => int32        <-- NEW
> >>>>>>>>>>> 2+4+4+4 bytes: isr => [int32] (assuming 3 in the ISR)
> >>>>>>>>>>> 4 bytes: zk_version => int32
> >>>>>>>>>>> 2+4+4+4 bytes: replicas => [int32] (assuming 3 replicas)
> >>>>>>>>>>> 2  offline_replicas => [int32] (assuming no offline
> >>> replicas)
> >>>>>>>>>>
> >>>>>>>>>> Assuming I added that up correctly, the per-partition
> >> overhead
> >>>>> goes
> >>>>>>>> from
> >>>>>>>>>> 64 bytes per partition to 68, a 6.2% increase.
> >>>>>>>>>>
> >>>>>>>>>> We could do similar math for a lot of the other RPCs.  And
> >> you
> >>>>> will
> >>>>>>>> have
> >>>>>>>>> a
> >>>>>>>>>> similar memory and garbage collection impact on the brokers
> >>>>> since
> >>>>>> you
> >>>>>>>>> have
> >>>>>>>>>> to store all this extra state as well.
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> That is correct. IMO the Metadata is only updated periodically
> >>>>> and is
> >>>>>>>>> probably not a big deal if we increase it by 6%. The
> >>> FetchResponse
> >>>>>> and
> >>>>>>>>> ProduceRequest are probably the only requests that are bounded
> >>> by
> >>>>> the
> >>>>>>>>> bandwidth throughput.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> I agree that we can probably save more space by using
> >>>>> partition
> >>>>>>> ID
> >>>>>>>> so
> >>>>>>>>>> that
> >>>>>>>>>>>> we no longer needs the string topic name. The similar
> >> idea
> >>>>> has
> >>>>>>> also
> >>>>>>>>>> been
> >>>>>>>>>>>> put in the Rejected Alternative section in KIP-227.
> >> While
> >>>>> this
> >>>>>>> idea
> >>>>>>>>> is
> >>>>>>>>>>>> promising, it seems orthogonal to the goal of this KIP.
> >>>>> Given
> >>>>>>> that
> >>>>>>>>>> there is
> >>>>>>>>>>>> already many work to do in this KIP, maybe we can do the
> >>>>>>> partition
> >>>>>>>> ID
> >>>>>>>>>> in a
> >>>>>>>>>>>> separate KIP?
> >>>>>>>>>>
> >>>>>>>>>> I guess my thinking is that the goal here is to replace an
> >>>>>> identifier
> >>>>>>>>>> which can be re-used (the tuple of topic name, partition ID)
> >>>>> with
> >>>>>> an
> >>>>>>>>>> identifier that cannot be re-used (the tuple of topic name,
> >>>>>> partition
> >>>>>>>> ID,
> >>>>>>>>>> partition epoch) in order to gain better semantics.  As long
> >>> as
> >>>>> we
> >>>>>>> are
> >>>>>>>>>> replacing the identifier, why not replace it with an
> >>> identifier
> >>>>>> that
> >>>>>>>> has
> >>>>>>>>>> important performance advantages?  The KIP freeze for the
> >> next
> >>>>>>> release
> >>>>>>>>> has
> >>>>>>>>>> already passed, so there is time to do this.
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> In general it can be easier for discussion and implementation
> >> if
> >>>>> we
> >>>>>> can
> >>>>>>>>> split a larger task into smaller and independent tasks. For
> >>>>> example,
> >>>>>>>>> KIP-112 and KIP-113 both deals with the JBOD support. KIP-31,
> >>>>> KIP-32
> >>>>>>> and
> >>>>>>>>> KIP-33 are about timestamp support. The option on this can be
> >>>>> subject
> >>>>>>>>> though.
> >>>>>>>>>
> >>>>>>>>> IMO the change to switch from (topic, partition ID) to
> >>>>> partitionEpch
> >>>>>> in
> >>>>>>>> all
> >>>>>>>>> request/response requires us to going through all request one
> >> by
> >>>>> one.
> >>>>>>> It
> >>>>>>>>> may not be hard but it can be time consuming and tedious. At
> >>> high
> >>>>>> level
> >>>>>>>> the
> >>>>>>>>> goal and the change for that will be orthogonal to the changes
> >>>>>> required
> >>>>>>>> in
> >>>>>>>>> this KIP. That is the main reason I think we can split them
> >> into
> >>>>> two
> >>>>>>>> KIPs.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>> On Mon, Jan 29, 2018, at 10:54, Dong Lin wrote:
> >>>>>>>>>>> I think it is possible to move to entirely use
> >>> partitionEpoch
> >>>>>>> instead
> >>>>>>>>> of
> >>>>>>>>>>> (topic, partition) to identify a partition. Client can
> >>> obtain
> >>>>> the
> >>>>>>>>>>> partitionEpoch -> (topic, partition) mapping from
> >>>>>> MetadataResponse.
> >>>>>>>> We
> >>>>>>>>>>> probably need to figure out a way to assign partitionEpoch
> >>> to
> >>>>>>>> existing
> >>>>>>>>>>> partitions in the cluster. But this should be doable.
> >>>>>>>>>>>
> >>>>>>>>>>> This is a good idea. I think it will save us some space in
> >>> the
> >>>>>>>>>>> request/response. The actual space saving in percentage
> >>>>> probably
> >>>>>>>>> depends
> >>>>>>>>>> on
> >>>>>>>>>>> the amount of data and the number of partitions of the
> >> same
> >>>>>> topic.
> >>>>>>> I
> >>>>>>>>> just
> >>>>>>>>>>> think we can do it in a separate KIP.
> >>>>>>>>>>
> >>>>>>>>>> Hmm.  How much extra work would be required?  It seems like
> >> we
> >>>>> are
> >>>>>>>>> already
> >>>>>>>>>> changing almost every RPC that involves topics and
> >> partitions,
> >>>>>>> already
> >>>>>>>>>> adding new per-partition state to ZooKeeper, already
> >> changing
> >>>>> how
> >>>>>>>> clients
> >>>>>>>>>> interact with partitions.  Is there some other big piece of
> >>> work
> >>>>>> we'd
> >>>>>>>>> have
> >>>>>>>>>> to do to move to partition IDs that we wouldn't need for
> >>>>> partition
> >>>>>>>>> epochs?
> >>>>>>>>>> I guess we'd have to find a way to support regular
> >>>>> expression-based
> >>>>>>>> topic
> >>>>>>>>>> subscriptions.  If we split this into multiple KIPs,
> >> wouldn't
> >>> we
> >>>>>> end
> >>>>>>> up
> >>>>>>>>>> changing all that RPCs and ZK state a second time?  Also,
> >> I'm
> >>>>>> curious
> >>>>>>>> if
> >>>>>>>>>> anyone has done any proof of concept GC, memory, and network
> >>>>> usage
> >>>>>>>>>> measurements on switching topic names for topic IDs.
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> We will need to go over all requests/responses to check how to
> >>>>>> replace
> >>>>>>>>> (topic, partition ID) with partition epoch. It requires
> >>>>> non-trivial
> >>>>>>> work
> >>>>>>>>> and could take time. As you mentioned, we may want to see how
> >>> much
> >>>>>>> saving
> >>>>>>>>> we can get by switching from topic names to partition epoch.
> >>> That
> >>>>>>> itself
> >>>>>>>>> requires time and experiment. It seems that the new idea does
> >>> not
> >>>>>>>> rollback
> >>>>>>>>> any change proposed in this KIP. So I am not sure we can get
> >>> much
> >>>>> by
> >>>>>>>>> putting them into the same KIP.
> >>>>>>>>>
> >>>>>>>>> Anyway, if more people are interested in seeing the new idea
> >> in
> >>>>> the
> >>>>>>> same
> >>>>>>>>> KIP, I can try that.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> best,
> >>>>>>>>>> Colin
> >>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Mon, Jan 29, 2018 at 10:18 AM, Colin McCabe <
> >>>>>>> cmcc...@apache.org
> >>>>>>>>>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> On Fri, Jan 26, 2018, at 12:17, Dong Lin wrote:
> >>>>>>>>>>>>>> Hey Colin,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Fri, Jan 26, 2018 at 10:16 AM, Colin McCabe <
> >>>>>>>>> cmcc...@apache.org>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Thu, Jan 25, 2018, at 16:47, Dong Lin wrote:
> >>>>>>>>>>>>>>>> Hey Colin,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Thanks for the comment.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Thu, Jan 25, 2018 at 4:15 PM, Colin McCabe <
> >>>>>>>>>> cmcc...@apache.org>
> >>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Wed, Jan 24, 2018, at 21:07, Dong Lin wrote:
> >>>>>>>>>>>>>>>>>> Hey Colin,
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Thanks for reviewing the KIP.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> If I understand you right, you maybe
> >> suggesting
> >>>>> that
> >>>>>>> we
> >>>>>>>>> can
> >>>>>>>>>> use
> >>>>>>>>>>>>> a
> >>>>>>>>>>>>>>> global
> >>>>>>>>>>>>>>>>>> metadataEpoch that is incremented every time
> >>>>>>> controller
> >>>>>>>>>> updates
> >>>>>>>>>>>>>>> metadata.
> >>>>>>>>>>>>>>>>>> The problem with this solution is that, if a
> >>>>> topic
> >>>>>> is
> >>>>>>>>>> deleted
> >>>>>>>>>>>>> and
> >>>>>>>>>>>>>>> created
> >>>>>>>>>>>>>>>>>> again, user will not know whether that the
> >>> offset
> >>>>>>> which
> >>>>>>>> is
> >>>>>>>>>>>>> stored
> >>>>>>>>>>>>>>> before
> >>>>>>>>>>>>>>>>>> the topic deletion is no longer valid. This
> >>>>>> motivates
> >>>>>>>> the
> >>>>>>>>>> idea
> >>>>>>>>>>>>> to
> >>>>>>>>>>>>>>> include
> >>>>>>>>>>>>>>>>>> per-partition partitionEpoch. Does this sound
> >>>>>>>> reasonable?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Hi Dong,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Perhaps we can store the last valid offset of
> >>> each
> >>>>>>> deleted
> >>>>>>>>>> topic
> >>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>> ZooKeeper.  Then, when a topic with one of
> >> those
> >>>>> names
> >>>>>>>> gets
> >>>>>>>>>>>>>>> re-created, we
> >>>>>>>>>>>>>>>>> can start the topic at the previous end offset
> >>>>> rather
> >>>>>>> than
> >>>>>>>>> at
> >>>>>>>>>> 0.
> >>>>>>>>>>>>> This
> >>>>>>>>>>>>>>>>> preserves immutability.  It is no more
> >> burdensome
> >>>>> than
> >>>>>>>>> having
> >>>>>>>>>> to
> >>>>>>>>>>>>>>> preserve a
> >>>>>>>>>>>>>>>>> "last epoch" for the deleted partition
> >> somewhere,
> >>>>>> right?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> My concern with this solution is that the number
> >> of
> >>>>>>>> zookeeper
> >>>>>>>>>> nodes
> >>>>>>>>>>>>> get
> >>>>>>>>>>>>>>>> more and more over time if some users keep
> >> deleting
> >>>>> and
> >>>>>>>>> creating
> >>>>>>>>>>>>> topics.
> >>>>>>>>>>>>>>> Do
> >>>>>>>>>>>>>>>> you think this can be a problem?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Hi Dong,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> We could expire the "partition tombstones" after an
> >>>>> hour
> >>>>>> or
> >>>>>>>> so.
> >>>>>>>>>> In
> >>>>>>>>>>>>>>> practice this would solve the issue for clients
> >> that
> >>>>> like
> >>>>>> to
> >>>>>>>>>> destroy
> >>>>>>>>>>>>> and
> >>>>>>>>>>>>>>> re-create topics all the time.  In any case,
> >> doesn't
> >>>>> the
> >>>>>>>> current
> >>>>>>>>>>>>> proposal
> >>>>>>>>>>>>>>> add per-partition znodes as well that we have to
> >>> track
> >>>>>> even
> >>>>>>>>> after
> >>>>>>>>>> the
> >>>>>>>>>>>>>>> partition is deleted?  Or did I misunderstand that?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Actually the current KIP does not add per-partition
> >>>>> znodes.
> >>>>>>>> Could
> >>>>>>>>>> you
> >>>>>>>>>>>>>> double check? I can fix the KIP wiki if there is
> >>> anything
> >>>>>>>>>> misleading.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Hi Dong,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I double-checked the KIP, and I can see that you are in
> >>>>> fact
> >>>>>>>> using a
> >>>>>>>>>>>>> global counter for initializing partition epochs.  So,
> >>> you
> >>>>> are
> >>>>>>>>>> correct, it
> >>>>>>>>>>>>> doesn't add per-partition znodes for partitions that no
> >>>>> longer
> >>>>>>>>> exist.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> If we expire the "partition tomstones" after an hour,
> >>> and
> >>>>>> the
> >>>>>>>>> topic
> >>>>>>>>>> is
> >>>>>>>>>>>>>> re-created after more than an hour since the topic
> >>>>> deletion,
> >>>>>>>> then
> >>>>>>>>>> we are
> >>>>>>>>>>>>>> back to the situation where user can not tell whether
> >>> the
> >>>>>>> topic
> >>>>>>>>> has
> >>>>>>>>>> been
> >>>>>>>>>>>>>> re-created or not, right?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Yes, with an expiration period, it would not ensure
> >>>>>>> immutability--
> >>>>>>>>> you
> >>>>>>>>>>>>> could effectively reuse partition names and they would
> >>> look
> >>>>>> the
> >>>>>>>>> same.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> It's not really clear to me what should happen
> >> when a
> >>>>>> topic
> >>>>>>> is
> >>>>>>>>>>>>> destroyed
> >>>>>>>>>>>>>>> and re-created with new data.  Should consumers
> >>>>> continue
> >>>>>> to
> >>>>>>> be
> >>>>>>>>>> able to
> >>>>>>>>>>>>>>> consume?  We don't know where they stopped
> >> consuming
> >>>>> from
> >>>>>>> the
> >>>>>>>>>> previous
> >>>>>>>>>>>>>>> incarnation of the topic, so messages may have been
> >>>>> lost.
> >>>>>>>>>> Certainly
> >>>>>>>>>>>>>>> consuming data from offset X of the new incarnation
> >>> of
> >>>>> the
> >>>>>>>> topic
> >>>>>>>>>> may
> >>>>>>>>>>>>> give
> >>>>>>>>>>>>>>> something totally different from what you would
> >> have
> >>>>>> gotten
> >>>>>>>> from
> >>>>>>>>>>>>> offset X
> >>>>>>>>>>>>>>> of the previous incarnation of the topic.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> With the current KIP, if a consumer consumes a topic
> >>>>> based
> >>>>>> on
> >>>>>>>> the
> >>>>>>>>>> last
> >>>>>>>>>>>>>> remembered (offset, partitionEpoch, leaderEpoch), and
> >>> if
> >>>>> the
> >>>>>>>> topic
> >>>>>>>>>> is
> >>>>>>>>>>>>>> re-created, consume will throw
> >>>>>> InvalidPartitionEpochException
> >>>>>>>>>> because
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>> previous partitionEpoch will be different from the
> >>>>> current
> >>>>>>>>>>>>> partitionEpoch.
> >>>>>>>>>>>>>> This is described in the Proposed Changes ->
> >>> Consumption
> >>>>>> after
> >>>>>>>>> topic
> >>>>>>>>>>>>>> deletion in the KIP. I can improve the KIP if there
> >> is
> >>>>>>> anything
> >>>>>>>>> not
> >>>>>>>>>>>>> clear.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks for the clarification.  It sounds like what you
> >>>>> really
> >>>>>>> want
> >>>>>>>>> is
> >>>>>>>>>>>>> immutability-- i.e., to never "really" reuse partition
> >>>>>>>> identifiers.
> >>>>>>>>>> And
> >>>>>>>>>>>>> you do this by making the partition name no longer the
> >>>>> "real"
> >>>>>>>>>> identifier.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> My big concern about this KIP is that it seems like an
> >>>>>>>>>> anti-scalability
> >>>>>>>>>>>>> feature.  Now we are adding 4 extra bytes for every
> >>>>> partition
> >>>>>> in
> >>>>>>>> the
> >>>>>>>>>>>>> FetchResponse and Request, for example.  That could be
> >> 40
> >>>>> kb
> >>>>>> per
> >>>>>>>>>> request,
> >>>>>>>>>>>>> if the user has 10,000 partitions.  And of course, the
> >>> KIP
> >>>>>> also
> >>>>>>>>> makes
> >>>>>>>>>>>>> massive changes to UpdateMetadataRequest,
> >>> MetadataResponse,
> >>>>>>>>>>>>> OffsetCommitRequest, OffsetFetchResponse,
> >>>>> LeaderAndIsrRequest,
> >>>>>>>>>>>>> ListOffsetResponse, etc. which will also increase their
> >>>>> size
> >>>>>> on
> >>>>>>>> the
> >>>>>>>>>> wire
> >>>>>>>>>>>>> and in memory.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> One thing that we talked a lot about in the past is
> >>>>> replacing
> >>>>>>>>>> partition
> >>>>>>>>>>>>> names with IDs.  IDs have a lot of really nice
> >> features.
> >>>>> They
> >>>>>>>> take
> >>>>>>>>>> up much
> >>>>>>>>>>>>> less space in memory than strings (especially 2-byte
> >> Java
> >>>>>>>> strings).
> >>>>>>>>>> They
> >>>>>>>>>>>>> can often be allocated on the stack rather than the
> >> heap
> >>>>>>>> (important
> >>>>>>>>>> when
> >>>>>>>>>>>>> you are dealing with hundreds of thousands of them).
> >>> They
> >>>>> can
> >>>>>>> be
> >>>>>>>>>>>>> efficiently deserialized and serialized.  If we use
> >>> 64-bit
> >>>>>> ones,
> >>>>>>>> we
> >>>>>>>>>> will
> >>>>>>>>>>>>> never run out of IDs, which means that they can always
> >> be
> >>>>>> unique
> >>>>>>>> per
> >>>>>>>>>>>>> partition.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Given that the partition name is no longer the "real"
> >>>>>> identifier
> >>>>>>>> for
> >>>>>>>>>>>>> partitions in the current KIP-232 proposal, why not
> >> just
> >>>>> move
> >>>>>> to
> >>>>>>>>> using
> >>>>>>>>>>>>> partition IDs entirely instead of strings?  You have to
> >>>>> change
> >>>>>>> all
> >>>>>>>>> the
> >>>>>>>>>>>>> messages anyway.  There isn't much point any more to
> >>>>> carrying
> >>>>>>>> around
> >>>>>>>>>> the
> >>>>>>>>>>>>> partition name in every RPC, since you really need
> >> (name,
> >>>>>> epoch)
> >>>>>>>> to
> >>>>>>>>>>>>> identify the partition.
> >>>>>>>>>>>>> Probably the metadata response and a few other messages
> >>>>> would
> >>>>>>> have
> >>>>>>>>> to
> >>>>>>>>>>>>> still carry the partition name, to allow clients to go
> >>> from
> >>>>>> name
> >>>>>>>> to
> >>>>>>>>>> id.
> >>>>>>>>>>>>> But we could mostly forget about the strings.  And then
> >>>>> this
> >>>>>>> would
> >>>>>>>>> be
> >>>>>>>>>> a
> >>>>>>>>>>>>> scalability improvement rather than a scalability
> >>> problem.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> By choosing to reuse the same (topic, partition,
> >>>>> offset)
> >>>>>>>>> 3-tuple,
> >>>>>>>>>> we
> >>>>>>>>>>>>> have
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> chosen to give up immutability.  That was a really
> >> bad
> >>>>>>> decision.
> >>>>>>>>>> And
> >>>>>>>>>>>>> now
> >>>>>>>>>>>>>>> we have to worry about time dependencies, stale
> >>> cached
> >>>>>> data,
> >>>>>>>> and
> >>>>>>>>>> all
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>> rest.  We can't completely fix this inside Kafka no
> >>>>> matter
> >>>>>>>> what
> >>>>>>>>>> we do,
> >>>>>>>>>>>>>>> because not all that cached data is inside Kafka
> >>>>> itself.
> >>>>>>> Some
> >>>>>>>>> of
> >>>>>>>>>> it
> >>>>>>>>>>>>> may be
> >>>>>>>>>>>>>>> in systems that Kafka has sent data to, such as
> >> other
> >>>>>>> daemons,
> >>>>>>>>> SQL
> >>>>>>>>>>>>>>> databases, streams, and so forth.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> The current KIP will uniquely identify a message
> >> using
> >>>>>> (topic,
> >>>>>>>>>>>>> partition,
> >>>>>>>>>>>>>> offset, partitionEpoch) 4-tuple. This addresses the
> >>>>> message
> >>>>>>>>>> immutability
> >>>>>>>>>>>>>> issue that you mentioned. Is there any corner case
> >>> where
> >>>>> the
> >>>>>>>>> message
> >>>>>>>>>>>>>> immutability is still not preserved with the current
> >>> KIP?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I guess the idea here is that mirror maker should
> >>> work
> >>>>> as
> >>>>>>>>> expected
> >>>>>>>>>>>>> when
> >>>>>>>>>>>>>>> users destroy a topic and re-create it with the
> >> same
> >>>>> name.
> >>>>>>>>> That's
> >>>>>>>>>>>>> kind of
> >>>>>>>>>>>>>>> tough, though, since in that scenario, mirror maker
> >>>>>> probably
> >>>>>>>>>> should
> >>>>>>>>>>>>> destroy
> >>>>>>>>>>>>>>> and re-create the topic on the other end, too,
> >> right?
> >>>>>>>>> Otherwise,
> >>>>>>>>>>>>> what you
> >>>>>>>>>>>>>>> end up with on the other end could be half of one
> >>>>>>> incarnation
> >>>>>>>> of
> >>>>>>>>>> the
> >>>>>>>>>>>>> topic,
> >>>>>>>>>>>>>>> and half of another.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> What mirror maker really needs is to be able to
> >>> follow
> >>>>> a
> >>>>>>>> stream
> >>>>>>>>> of
> >>>>>>>>>>>>> events
> >>>>>>>>>>>>>>> about the kafka cluster itself.  We could have some
> >>>>> master
> >>>>>>>> topic
> >>>>>>>>>>>>> which is
> >>>>>>>>>>>>>>> always present and which contains data about all
> >>> topic
> >>>>>>>>> deletions,
> >>>>>>>>>>>>>>> creations, etc.  Then MM can simply follow this
> >> topic
> >>>>> and
> >>>>>> do
> >>>>>>>>> what
> >>>>>>>>>> is
> >>>>>>>>>>>>> needed.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Then the next question maybe, should we use a
> >>>>> global
> >>>>>>>>>>>>> metadataEpoch +
> >>>>>>>>>>>>>>>>>> per-partition partitionEpoch, instead of
> >> using
> >>>>>>>>> per-partition
> >>>>>>>>>>>>>>> leaderEpoch
> >>>>>>>>>>>>>>>>> +
> >>>>>>>>>>>>>>>>>> per-partition leaderEpoch. The former
> >> solution
> >>>>> using
> >>>>>>>>>>>>> metadataEpoch
> >>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>> not work due to the following scenario
> >>> (provided
> >>>>> by
> >>>>>>>> Jun):
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> "Consider the following scenario. In metadata
> >>> v1,
> >>>>>> the
> >>>>>>>>> leader
> >>>>>>>>>>>>> for a
> >>>>>>>>>>>>>>>>>> partition is at broker 1. In metadata v2,
> >>> leader
> >>>>> is
> >>>>>> at
> >>>>>>>>>> broker
> >>>>>>>>>>>>> 2. In
> >>>>>>>>>>>>>>>>>> metadata v3, leader is at broker 1 again. The
> >>>>> last
> >>>>>>>>> committed
> >>>>>>>>>>>>> offset
> >>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>> v1,
> >>>>>>>>>>>>>>>>>> v2 and v3 are 10, 20 and 30, respectively. A
> >>>>>> consumer
> >>>>>>> is
> >>>>>>>>>>>>> started and
> >>>>>>>>>>>>>>>>> reads
> >>>>>>>>>>>>>>>>>> metadata v1 and reads messages from offset 0
> >> to
> >>>>> 25
> >>>>>>> from
> >>>>>>>>>> broker
> >>>>>>>>>>>>> 1. My
> >>>>>>>>>>>>>>>>>> understanding is that in the current
> >> proposal,
> >>>>> the
> >>>>>>>>> metadata
> >>>>>>>>>>>>> version
> >>>>>>>>>>>>>>>>>> associated with offset 25 is v1. The consumer
> >>> is
> >>>>>> then
> >>>>>>>>>> restarted
> >>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>> fetches
> >>>>>>>>>>>>>>>>>> metadata v2. The consumer tries to read from
> >>>>> broker
> >>>>>> 2,
> >>>>>>>>>> which is
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>> old
> >>>>>>>>>>>>>>>>>> leader with the last offset at 20. In this
> >>> case,
> >>>>> the
> >>>>>>>>>> consumer
> >>>>>>>>>>>>> will
> >>>>>>>>>>>>>>> still
> >>>>>>>>>>>>>>>>>> get OffsetOutOfRangeException incorrectly."
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Regarding your comment "For the second
> >> purpose,
> >>>>> this
> >>>>>>> is
> >>>>>>>>>> "soft
> >>>>>>>>>>>>> state"
> >>>>>>>>>>>>>>>>>> anyway.  If the client thinks X is the leader
> >>>>> but Y
> >>>>>> is
> >>>>>>>>>> really
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>> leader,
> >>>>>>>>>>>>>>>>>> the client will talk to X, and X will point
> >> out
> >>>>> its
> >>>>>>>>> mistake
> >>>>>>>>>> by
> >>>>>>>>>>>>>>> sending
> >>>>>>>>>>>>>>>>> back
> >>>>>>>>>>>>>>>>>> a NOT_LEADER_FOR_PARTITION.", it is probably
> >> no
> >>>>>> true.
> >>>>>>>> The
> >>>>>>>>>>>>> problem
> >>>>>>>>>>>>>>> here is
> >>>>>>>>>>>>>>>>>> that the old leader X may still think it is
> >> the
> >>>>>> leader
> >>>>>>>> of
> >>>>>>>>>> the
> >>>>>>>>>>>>>>> partition
> >>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>> thus it will not send back
> >>>>> NOT_LEADER_FOR_PARTITION.
> >>>>>>> The
> >>>>>>>>>> reason
> >>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>> provided
> >>>>>>>>>>>>>>>>>> in KAFKA-6262. Can you check if that makes
> >>> sense?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> This is solvable with a timeout, right?  If the
> >>>>> leader
> >>>>>>>> can't
> >>>>>>>>>>>>>>> communicate
> >>>>>>>>>>>>>>>>> with the controller for a certain period of
> >> time,
> >>>>> it
> >>>>>>>> should
> >>>>>>>>>> stop
> >>>>>>>>>>>>>>> acting as
> >>>>>>>>>>>>>>>>> the leader.  We have to solve this problem,
> >>>>> anyway, in
> >>>>>>>> order
> >>>>>>>>>> to
> >>>>>>>>>>>>> fix
> >>>>>>>>>>>>>>> all the
> >>>>>>>>>>>>>>>>> corner cases.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Not sure if I fully understand your proposal. The
> >>>>>> proposal
> >>>>>>>>>> seems to
> >>>>>>>>>>>>>>> require
> >>>>>>>>>>>>>>>> non-trivial changes to our existing leadership
> >>>>> election
> >>>>>>>>>> mechanism.
> >>>>>>>>>>>>> Could
> >>>>>>>>>>>>>>>> you provide more detail regarding how it works?
> >> For
> >>>>>>> example,
> >>>>>>>>> how
> >>>>>>>>>>>>> should
> >>>>>>>>>>>>>>>> user choose this timeout, how leader determines
> >>>>> whether
> >>>>>> it
> >>>>>>>> can
> >>>>>>>>>> still
> >>>>>>>>>>>>>>>> communicate with controller, and how this
> >> triggers
> >>>>>>>> controller
> >>>>>>>>> to
> >>>>>>>>>>>>> elect
> >>>>>>>>>>>>>>> new
> >>>>>>>>>>>>>>>> leader?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Before I come up with any proposal, let me make
> >> sure
> >>> I
> >>>>>>>>> understand
> >>>>>>>>>> the
> >>>>>>>>>>>>>>> problem correctly.  My big question was, what
> >>> prevents
> >>>>>>>>> split-brain
> >>>>>>>>>>>>> here?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Let's say I have a partition which is on nodes A,
> >> B,
> >>>>> and
> >>>>>> C,
> >>>>>>>> with
> >>>>>>>>>>>>> min-ISR
> >>>>>>>>>>>>>>> 2.  The controller is D.  At some point, there is a
> >>>>>> network
> >>>>>>>>>> partition
> >>>>>>>>>>>>>>> between A and B and the rest of the cluster.  The
> >>>>>> Controller
> >>>>>>>>>>>>> re-assigns the
> >>>>>>>>>>>>>>> partition to nodes C, D, and E.  But A and B keep
> >>>>> chugging
> >>>>>>>> away,
> >>>>>>>>>> even
> >>>>>>>>>>>>>>> though they can no longer communicate with the
> >>>>> controller.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> At some point, a client with stale metadata writes
> >> to
> >>>>> the
> >>>>>>>>>> partition.
> >>>>>>>>>>>>> It
> >>>>>>>>>>>>>>> still thinks the partition is on node A, B, and C,
> >> so
> >>>>>> that's
> >>>>>>>>>> where it
> >>>>>>>>>>>>> sends
> >>>>>>>>>>>>>>> the data.  It's unable to talk to C, but A and B
> >>> reply
> >>>>>> back
> >>>>>>>> that
> >>>>>>>>>> all
> >>>>>>>>>>>>> is
> >>>>>>>>>>>>>>> well.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Is this not a case where we could lose data due to
> >>>>> split
> >>>>>>>> brain?
> >>>>>>>>>> Or is
> >>>>>>>>>>>>>>> there a mechanism for preventing this that I
> >> missed?
> >>>>> If
> >>>>>> it
> >>>>>>>> is,
> >>>>>>>>> it
> >>>>>>>>>>>>> seems
> >>>>>>>>>>>>>>> like a pretty serious failure case that we should
> >> be
> >>>>>>> handling
> >>>>>>>>>> with our
> >>>>>>>>>>>>>>> metadata rework.  And I think epoch numbers and
> >>>>> timeouts
> >>>>>>> might
> >>>>>>>>> be
> >>>>>>>>>>>>> part of
> >>>>>>>>>>>>>>> the solution.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Right, split brain can happen if RF=4 and minIsr=2.
> >>>>>> However, I
> >>>>>>>> am
> >>>>>>>>>> not
> >>>>>>>>>>>>> sure
> >>>>>>>>>>>>>> it is a pretty serious issue which we need to address
> >>>>> today.
> >>>>>>>> This
> >>>>>>>>>> can be
> >>>>>>>>>>>>>> prevented by configuring the Kafka topic so that
> >>> minIsr >
> >>>>>>> RF/2.
> >>>>>>>>>>>>> Actually,
> >>>>>>>>>>>>>> if user sets minIsr=2, is there anything reason that
> >>> user
> >>>>>>> wants
> >>>>>>>> to
> >>>>>>>>>> set
> >>>>>>>>>>>>> RF=4
> >>>>>>>>>>>>>> instead of 4?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Introducing timeout in leader election mechanism is
> >>>>>>>> non-trivial. I
> >>>>>>>>>>>>> think we
> >>>>>>>>>>>>>> probably want to do that only if there is good
> >> use-case
> >>>>> that
> >>>>>>> can
> >>>>>>>>> not
> >>>>>>>>>>>>>> otherwise be addressed with the current mechanism.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I still would like to think about these corner cases
> >>> more.
> >>>>>> But
> >>>>>>>>>> perhaps
> >>>>>>>>>>>>> it's not directly related to this KIP.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> regards,
> >>>>>>>>>>>>> Colin
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> best,
> >>>>>>>>>>>>>>> Colin
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> best,
> >>>>>>>>>>>>>>>>> Colin
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>> Dong
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On Wed, Jan 24, 2018 at 10:39 AM, Colin
> >> McCabe
> >>> <
> >>>>>>>>>>>>> cmcc...@apache.org>
> >>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Hi Dong,
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Thanks for proposing this KIP.  I think a
> >>>>> metadata
> >>>>>>>> epoch
> >>>>>>>>>> is a
> >>>>>>>>>>>>>>> really
> >>>>>>>>>>>>>>>>> good
> >>>>>>>>>>>>>>>>>>> idea.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> I read through the DISCUSS thread, but I
> >>> still
> >>>>>> don't
> >>>>>>>>> have
> >>>>>>>>>> a
> >>>>>>>>>>>>> clear
> >>>>>>>>>>>>>>>>> picture
> >>>>>>>>>>>>>>>>>>> of why the proposal uses a metadata epoch
> >> per
> >>>>>>>> partition
> >>>>>>>>>> rather
> >>>>>>>>>>>>>>> than a
> >>>>>>>>>>>>>>>>>>> global metadata epoch.  A metadata epoch
> >> per
> >>>>>>> partition
> >>>>>>>>> is
> >>>>>>>>>>>>> kind of
> >>>>>>>>>>>>>>>>>>> unpleasant-- it's at least 4 extra bytes
> >> per
> >>>>>>> partition
> >>>>>>>>>> that we
> >>>>>>>>>>>>>>> have to
> >>>>>>>>>>>>>>>>> send
> >>>>>>>>>>>>>>>>>>> over the wire in every full metadata
> >> request,
> >>>>>> which
> >>>>>>>>> could
> >>>>>>>>>>>>> become
> >>>>>>>>>>>>>>> extra
> >>>>>>>>>>>>>>>>>>> kilobytes on the wire when the number of
> >>>>>> partitions
> >>>>>>>>>> becomes
> >>>>>>>>>>>>> large.
> >>>>>>>>>>>>>>>>> Plus,
> >>>>>>>>>>>>>>>>>>> we have to update all the auxillary classes
> >>> to
> >>>>>>> include
> >>>>>>>>> an
> >>>>>>>>>>>>> epoch.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> We need to have a global metadata epoch
> >>> anyway
> >>>>> to
> >>>>>>>> handle
> >>>>>>>>>>>>> partition
> >>>>>>>>>>>>>>>>>>> addition and deletion.  For example, if I
> >>> give
> >>>>> you
> >>>>>>>>>>>>>>>>>>> MetadataResponse{part1,epoch 1, part2,
> >> epoch
> >>> 1}
> >>>>>> and
> >>>>>>>>>> {part1,
> >>>>>>>>>>>>>>> epoch1},
> >>>>>>>>>>>>>>>>> which
> >>>>>>>>>>>>>>>>>>> MetadataResponse is newer?  You have no way
> >>> of
> >>>>>>>> knowing.
> >>>>>>>>>> It
> >>>>>>>>>>>>> could
> >>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>> part2 has just been created, and the
> >> response
> >>>>>> with 2
> >>>>>>>>>>>>> partitions is
> >>>>>>>>>>>>>>>>> newer.
> >>>>>>>>>>>>>>>>>>> Or it coudl be that part2 has just been
> >>>>> deleted,
> >>>>>> and
> >>>>>>>>>>>>> therefore the
> >>>>>>>>>>>>>>>>> response
> >>>>>>>>>>>>>>>>>>> with 1 partition is newer.  You must have a
> >>>>> global
> >>>>>>>> epoch
> >>>>>>>>>> to
> >>>>>>>>>>>>>>>>> disambiguate
> >>>>>>>>>>>>>>>>>>> these two cases.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Previously, I worked on the Ceph
> >> distributed
> >>>>>>>> filesystem.
> >>>>>>>>>>>>> Ceph had
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> concept of a map of the whole cluster,
> >>>>> maintained
> >>>>>>> by a
> >>>>>>>>> few
> >>>>>>>>>>>>> servers
> >>>>>>>>>>>>>>>>> doing
> >>>>>>>>>>>>>>>>>>> paxos.  This map was versioned by a single
> >>>>> 64-bit
> >>>>>>>> epoch
> >>>>>>>>>> number
> >>>>>>>>>>>>>>> which
> >>>>>>>>>>>>>>>>>>> increased on every change.  It was
> >> propagated
> >>>>> to
> >>>>>>>> clients
> >>>>>>>>>>>>> through
> >>>>>>>>>>>>>>>>> gossip.  I
> >>>>>>>>>>>>>>>>>>> wonder if something similar could work
> >> here?
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> It seems like the the Kafka
> >> MetadataResponse
> >>>>>> serves
> >>>>>>>> two
> >>>>>>>>>>>>> somewhat
> >>>>>>>>>>>>>>>>> unrelated
> >>>>>>>>>>>>>>>>>>> purposes.  Firstly, it lets clients know
> >> what
> >>>>>>>> partitions
> >>>>>>>>>>>>> exist in
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> system and where they live.  Secondly, it
> >>> lets
> >>>>>>> clients
> >>>>>>>>>> know
> >>>>>>>>>>>>> which
> >>>>>>>>>>>>>>> nodes
> >>>>>>>>>>>>>>>>>>> within the partition are in-sync (in the
> >> ISR)
> >>>>> and
> >>>>>>>> which
> >>>>>>>>>> node
> >>>>>>>>>>>>> is the
> >>>>>>>>>>>>>>>>> leader.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> The first purpose is what you really need a
> >>>>>> metadata
> >>>>>>>>> epoch
> >>>>>>>>>>>>> for, I
> >>>>>>>>>>>>>>>>> think.
> >>>>>>>>>>>>>>>>>>> You want to know whether a partition exists
> >>> or
> >>>>>> not,
> >>>>>>> or
> >>>>>>>>> you
> >>>>>>>>>>>>> want to
> >>>>>>>>>>>>>>> know
> >>>>>>>>>>>>>>>>>>> which nodes you should talk to in order to
> >>>>> write
> >>>>>> to
> >>>>>>> a
> >>>>>>>>>> given
> >>>>>>>>>>>>>>>>> partition.  A
> >>>>>>>>>>>>>>>>>>> single metadata epoch for the whole
> >> response
> >>>>>> should
> >>>>>>> be
> >>>>>>>>>>>>> adequate
> >>>>>>>>>>>>>>> here.
> >>>>>>>>>>>>>>>>> We
> >>>>>>>>>>>>>>>>>>> should not change the partition assignment
> >>>>> without
> >>>>>>>> going
> >>>>>>>>>>>>> through
> >>>>>>>>>>>>>>>>> zookeeper
> >>>>>>>>>>>>>>>>>>> (or a similar system), and this inherently
> >>>>>>> serializes
> >>>>>>>>>> updates
> >>>>>>>>>>>>> into
> >>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>> numbered stream.  Brokers should also stop
> >>>>>>> responding
> >>>>>>>> to
> >>>>>>>>>>>>> requests
> >>>>>>>>>>>>>>> when
> >>>>>>>>>>>>>>>>> they
> >>>>>>>>>>>>>>>>>>> are unable to contact ZK for a certain time
> >>>>>> period.
> >>>>>>>>> This
> >>>>>>>>>>>>> prevents
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> case
> >>>>>>>>>>>>>>>>>>> where a given partition has been moved off
> >>> some
> >>>>>> set
> >>>>>>> of
> >>>>>>>>>> nodes,
> >>>>>>>>>>>>> but a
> >>>>>>>>>>>>>>>>> client
> >>>>>>>>>>>>>>>>>>> still ends up talking to those nodes and
> >>>>> writing
> >>>>>>> data
> >>>>>>>>>> there.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> For the second purpose, this is "soft
> >> state"
> >>>>>> anyway.
> >>>>>>>> If
> >>>>>>>>>> the
> >>>>>>>>>>>>> client
> >>>>>>>>>>>>>>>>> thinks
> >>>>>>>>>>>>>>>>>>> X is the leader but Y is really the leader,
> >>> the
> >>>>>>> client
> >>>>>>>>>> will
> >>>>>>>>>>>>> talk
> >>>>>>>>>>>>>>> to X,
> >>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>> X will point out its mistake by sending
> >> back
> >>> a
> >>>>>>>>>>>>>>>>> NOT_LEADER_FOR_PARTITION.
> >>>>>>>>>>>>>>>>>>> Then the client can update its metadata
> >> again
> >>>>> and
> >>>>>>> find
> >>>>>>>>>> the new
> >>>>>>>>>>>>>>> leader,
> >>>>>>>>>>>>>>>>> if
> >>>>>>>>>>>>>>>>>>> there is one.  There is no need for an
> >> epoch
> >>> to
> >>>>>>> handle
> >>>>>>>>>> this.
> >>>>>>>>>>>>>>>>> Similarly, I
> >>>>>>>>>>>>>>>>>>> can't think of a reason why changing the
> >>>>> in-sync
> >>>>>>>> replica
> >>>>>>>>>> set
> >>>>>>>>>>>>> needs
> >>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>> bump
> >>>>>>>>>>>>>>>>>>> the epoch.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> best,
> >>>>>>>>>>>>>>>>>>> Colin
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On Wed, Jan 24, 2018, at 09:45, Dong Lin
> >>> wrote:
> >>>>>>>>>>>>>>>>>>>> Thanks much for reviewing the KIP!
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Dong
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> On Wed, Jan 24, 2018 at 7:10 AM, Guozhang
> >>>>> Wang <
> >>>>>>>>>>>>>>> wangg...@gmail.com>
> >>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Yeah that makes sense, again I'm just
> >>>>> making
> >>>>>>> sure
> >>>>>>>> we
> >>>>>>>>>>>>> understand
> >>>>>>>>>>>>>>>>> all the
> >>>>>>>>>>>>>>>>>>>>> scenarios and what to expect.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> I agree that if, more generally
> >> speaking,
> >>>>> say
> >>>>>>>> users
> >>>>>>>>>> have
> >>>>>>>>>>>>> only
> >>>>>>>>>>>>>>>>> consumed
> >>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>> offset 8, and then call seek(16) to
> >>> "jump"
> >>>>> to
> >>>>>> a
> >>>>>>>>>> further
> >>>>>>>>>>>>>>> position,
> >>>>>>>>>>>>>>>>> then
> >>>>>>>>>>>>>>>>>>> she
> >>>>>>>>>>>>>>>>>>>>> needs to be aware that OORE maybe
> >> thrown
> >>>>> and
> >>>>>> she
> >>>>>>>>>> needs to
> >>>>>>>>>>>>>>> handle
> >>>>>>>>>>>>>>>>> it or
> >>>>>>>>>>>>>>>>>>> rely
> >>>>>>>>>>>>>>>>>>>>> on reset policy which should not
> >> surprise
> >>>>> her.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> I'm +1 on the KIP.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> On Wed, Jan 24, 2018 at 12:31 AM, Dong
> >>> Lin
> >>>>> <
> >>>>>>>>>>>>>>> lindon...@gmail.com>
> >>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Yes, in general we can not prevent
> >>>>>>>>>>>>> OffsetOutOfRangeException
> >>>>>>>>>>>>>>> if
> >>>>>>>>>>>>>>>>> user
> >>>>>>>>>>>>>>>>>>>>> seeks
> >>>>>>>>>>>>>>>>>>>>>> to a wrong offset. The main goal is
> >> to
> >>>>>> prevent
> >>>>>>>>>>>>>>>>>>> OffsetOutOfRangeException
> >>>>>>>>>>>>>>>>>>>>> if
> >>>>>>>>>>>>>>>>>>>>>> user has done things in the right
> >> way,
> >>>>> e.g.
> >>>>>>> user
> >>>>>>>>>> should
> >>>>>>>>>>>>> know
> >>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>> there
> >>>>>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>> message with this offset.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> For example, if user calls seek(..)
> >>> right
> >>>>>>> after
> >>>>>>>>>>>>>>> construction, the
> >>>>>>>>>>>>>>>>>>> only
> >>>>>>>>>>>>>>>>>>>>>> reason I can think of is that user
> >>> stores
> >>>>>>> offset
> >>>>>>>>>>>>> externally.
> >>>>>>>>>>>>>>> In
> >>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>> case,
> >>>>>>>>>>>>>>>>>>>>>> user currently needs to use the
> >> offset
> >>>>> which
> >>>>>>> is
> >>>>>>>>>> obtained
> >>>>>>>>>>>>>>> using
> >>>>>>>>>>>>>>>>>>>>> position(..)
> >>>>>>>>>>>>>>>>>>>>>> from the last run. With this KIP,
> >> user
> >>>>> needs
> >>>>>>> to
> >>>>>>>>> get
> >>>>>>>>>> the
> >>>>>>>>>>>>>>> offset
> >>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> offsetEpoch using
> >>>>>> positionAndOffsetEpoch(...)
> >>>>>>>> and
> >>>>>>>>>> stores
> >>>>>>>>>>>>>>> these
> >>>>>>>>>>>>>>>>>>>>> information
> >>>>>>>>>>>>>>>>>>>>>> externally. The next time user starts
> >>>>>>> consumer,
> >>>>>>>>>> he/she
> >>>>>>>>>>>>> needs
> >>>>>>>>>>>>>>> to
> >>>>>

Reply via email to