Hey Eno,

I forgot that. Sure, that works for us.

Thanks,
Dong

On Thu, Feb 2, 2017 at 2:03 AM, Eno Thereska <eno.there...@gmail.com> wrote:

> Hi Dong,
>
> The KIP meetings are traditionally held at 11am. Would that also work? So
> Tuesday 7th at 11am?
>
> Thanks
> Eno
>
> > On 2 Feb 2017, at 02:53, Dong Lin <lindon...@gmail.com> wrote:
> >
> > Hey Eno, Colin,
> >
> > Would you have time next Tuesday morning to discuss the KIP? How about
> 10 -
> > 11 am?
> >
> > To make best use of our time, can you please invite one or more committer
> > from Confluent to join the meeting? I hope the KIP can receive one or
> more
> > +1 from committer at Confluent if we have no concern the KIP after the
> KIP
> > meeting.
> >
> > In the meeting time, please feel free to provide comment in the thread so
> > that discussion in the KIP meeting can be more efficient.
> >
> > Thanks,
> > Dong
> >
> > On Wed, Feb 1, 2017 at 5:43 PM, Dong Lin <lindon...@gmail.com> wrote:
> >
> >> Hey Colin,
> >>
> >> Thanks much for the comment. Please see my reply inline.
> >>
> >> On Wed, Feb 1, 2017 at 1:54 PM, Colin McCabe <cmcc...@apache.org>
> wrote:
> >>
> >>> On Wed, Feb 1, 2017, at 11:35, Dong Lin wrote:
> >>>> Hey Grant, Colin,
> >>>>
> >>>> My bad, I misunderstood Grant's suggestion initially. Indeed this is a
> >>>> very
> >>>> interesting idea to just wait for replica.max.lag.ms for the replica
> on
> >>>> the
> >>>> bad disk to drop out of ISR instead of having broker actively
> reporting
> >>>> this to the controller.
> >>>>
> >>>> I have several concerns with this approach.
> >>>>
> >>>> - Broker needs to maintain persist the information of all partitions
> >>> that
> >>>> it has created, in a file in each disk. This is needed for broker to
> >>> know
> >>>> the replicas already created on the bad disks that it can not access.
> If
> >>>> we
> >>>> don't do it, then controller sends LeaderAndIsrRequest to a broker to
> >>>> become follower for a partition on the bad disk, the broker will
> create
> >>>> partition on a good disk. The good disks may be overloaded as
> cascading
> >>>> effect.
> >>>>
> >>>> While it is possible to let broker keep track of the replicas that it
> >>> has
> >>>> created, I think it is less clean than the approach in the current KIP
> >>>> for
> >>>> reason provided in the rejective alternative section.
> >>>>
> >>>> - Change is needed in the controller logic to handle failure to make a
> >>>> broker as leader when controller receives LeaderAndIsrResponse.
> >>>> Otherwise,
> >>>> things go wrong if partition on the bad disk is requested to become
> >>>> leader.
> >>>> As of now, broker doesn't handle error in LeaderAndIsrResponse.
> >>>>
> >>>> - We still need tools and mechanism for administrator to know whether
> a
> >>>> replica is offline due to bad disk. I am worried that asking
> >>>> administrator
> >>>> to log into a machine and get this information in the log is not
> >>> scalable
> >>>> when the broker number is large. Although each company can develop
> their
> >>>> internal tools to get this information, it is a waste of developer
> time
> >>>> to
> >>>> reinvent the wheel. Reading this information in the log also seems
> less
> >>>> reliable then getting it from Kafka request/response.
> >>>>
> >>>> I guess the goal of this alternative approach is to avoid making major
> >>>> change in Kafka at the cost of increased disk failure discovery time
> >>> etc.
> >>>> But I think the changes required for fixing the problems above won't
> be
> >>>> much less.
> >>>
> >>> Thanks for the thoughtful replies, Dong L.
> >>>
> >>> Instead of having an "offline" state, how about having a "creating"
> >>> state for replicas and a "created" state?  Then if a replica was not
> >>> accessible on any disk, but still in "created" state, the broker could
> >>> know that something had gone wrong.  This also would catch issues like
> >>> the broker being started without all log directories configured, or
> >>> disks not being correctly mounted at the expected mount points, leading
> >>> to empty directories.
> >>>
> >>
> >> Indeed, we need to have an additional state per replica to solve this
> >> problem. The current KIP design addresses the problem by putting the
> >> "created" state in zookeeper, as you can see in the public interface
> change
> >> of the KIP. Are you suggesting to solve the problem by storing this
> >> information in local disk of the broker instead of zookeeper? I have two
> >> concerns with this approach:
> >>
> >> - It requires broker to keep track of the replicas it has created. This
> >> solution will split the task of determining offline replicas among
> >> controller and brokers as opposed to the current Kafka design, where the
> >> controller determines states of replicas and propagate this information
> to
> >> brokers. We think it is less error-prone to still let controller be the
> >> only entity that maintains metadata (e.g. replica state) of Kafka
> cluster.
> >>
> >> - If we store this information in local disk, then we need to have
> >> additional request/response protocol in order to request broker to reset
> >> this information, e.g. after a bad disk is replaced with good disk, so
> that
> >> the replica can be re-created on a good disk. Things would be easier if
> we
> >> store this information in zookeeper.
> >>
> >>
> >>>
> >>>>
> >>>> To answer Colin's questions:
> >>>>
> >>>> - There is no action required on the side of administrator in case of
> >>> log
> >>>> directory failure.
> >>>>
> >>>> - Broker itself is going to discover log directory failure and declare
> >>>> offline replicas. Broker doesn't explicitly declare log directory
> >>>> failure.
> >>>> But administrator can learn from the MetadataResponse that replica is
> >>>> offline due to disk failure, i.e. if replica is offline but broker is
> >>>> online.
> >>>
> >>> Can you expand on this a little bit?  It sounds like you are
> considering
> >>> dealing with failures on a replica-by-replica basis, rather than a
> >>> disk-by-disk basis.  But it's disks that fail, not really individual
> >>> files or directories on disks.  This decision interacts poorly with the
> >>> lack of a periodic scanner.  It's easy to imagine a scenario where an
> >>> infrequently used replica sits on a dead disk for a long time without
> us
> >>> declaring it dead.
> >>>
> >>
> >> Sure. The broker will fail deal with failures on a disk-by-disk basis.
> All
> >> replicas on a disk to be failed if there is any disk related exception
> when
> >> broker accesses that disk. It means that if any replica on that broker
> can
> >> not be read, then all replicas on that disk are considered offline.
> Since
> >> controller doesn't know the disk names of replicas, it has to learn the
> >> liveness of replicas on a replica-by-replica basis in order to do leader
> >> election.
> >>
> >> Besides, I don't think we will have problem with the scenario you
> >> described. If a replica is indeed not touched for a long time, then it
> >> doesn't matter whether it is considered dead or not. The moment it is
> >> needed, either for read or for write, the KIP makes sure that we know
> its
> >> state and make leader election accordingly.
> >>
> >>
> >>>>
> >>>> - This KIP does not handle cases where a few disks on a broker are
> full,
> >>>> but the others have space. If a disk is full and can not be written
> then
> >>>> the disk is considered to have failed. The imbalance across disks is
> an
> >>>> existing problem and will be handled in KIP-113.
> >>>
> >>> OK.
> >>>
> >>>>
> >>>> - This KIP does not do a disk scanner that will periodically check for
> >>>> error conditions. It doesn't handle any performance degradation of
> >>> disks.
> >>>> We wait for a failure to happen before declaring a disk bad.
> >>>>
> >>>> Yes, this KIP requires us to fix cases in the code where we are
> >>>> suppressing
> >>>> disk errors or ignoring their root cause. But decision of which
> >>> Exception
> >>>> should be considered disk failure and how to handle each of these are
> >>>> more
> >>>> like implementation detail. I hope we can focus on the detail and high
> >>>> level idea of this KIP and only worry about specific exception when
> the
> >>>> patch is being reviewed.
> >>>
> >>> Hmm... I think we should discuss how we are going to harden the code
> >>> against disk failures, and verify that it has been hardened.  Or maybe
> >>> we could do this in a follow-up KIP.
> >>>
> >>
> >> By "harden the code against disk errors" do you mean that we should
> make a
> >> full list of disk-related exception we may see and decide if we should
> >> treat each of these differently? I agree it is useful in the long term.
> But
> >> this is actually out of the scope of this KIP. The KIP will re-use the
> >> existing KafkaStorageException without having to change what exceptions
> are
> >> considered KafkaStorageException. The goal is to fail replicas on a disk
> >> instead of crashing the broker when KafkaStorageException is observed.
> >>
> >> Thanks,
> >> Dong
> >>
> >>
> >>>> After all we probably only know the list of
> >>>> exceptions and ways to handle them when we start to implement the KIP.
> >>>> And
> >>>> we need to improve this list over time as we discover various failure
> in
> >>>> the deployment.
> >>>>
> >>>>
> >>>> Hey Eno,
> >>>>
> >>>> Sure thing. Thanks for offering time to have a KIP meeting to discuss
> >>>> this.
> >>>> I will ask other Kafka developer at LinkedIn about their availability.
> >>>
> >>> Yeah, it would be nice to talk about this.
> >>
> >>
> >>> regards,
> >>> Colin
> >>>
> >>>
> >>>>
> >>>> Thanks,
> >>>> Dong
> >>>>
> >>>>
> >>>> On Wed, Feb 1, 2017 at 10:37 AM, Eno Thereska <eno.there...@gmail.com
> >
> >>>> wrote:
> >>>>
> >>>>> Hi Dong,
> >>>>>
> >>>>> Would it make sense to do a discussion over video/voice about this? I
> >>>>> think it's sufficiently complex that we can probably make quicker
> >>> progress
> >>>>> that way? So shall we do a KIP meeting soon? I can do this week
> >>> (Thu/Fri)
> >>>>> or next week.
> >>>>>
> >>>>> Thanks
> >>>>> Eno
> >>>>>> On 1 Feb 2017, at 18:29, Colin McCabe <cmcc...@apache.org> wrote:
> >>>>>>
> >>>>>> Hmm.  Maybe I misinterpreted, but I got the impression that Grant
> >>> was
> >>>>>> suggesting that we avoid introducing this concept of "offline
> >>> replicas"
> >>>>>> for now.  Is that feasible?
> >>>>>>
> >>>>>> What is the strategy for declaring a log directory bad?  Is it an
> >>>>>> administrative action?  Or is the broker itself going to be
> >>> responsible
> >>>>>> for this?  How do we handle cases where a few disks on a broker are
> >>>>>> full, but the others have space?
> >>>>>>
> >>>>>> Are we going to have a disk scanner that will periodically check for
> >>>>>> error conditions (similar to the background checks that RAID
> >>> controllers
> >>>>>> do)?  Or will we wait for a failure to happen before declaring a
> >>> disk
> >>>>>> bad?
> >>>>>>
> >>>>>> It seems to me that if we want this to work well we will need to fix
> >>>>>> cases in the code where we are suppressing disk errors or ignoring
> >>> their
> >>>>>> root cause.  For example, any place where we are using the old Java
> >>> APIs
> >>>>>> that just return a boolean on failure will need to be fixed, since
> >>> the
> >>>>>> failure could now be disk full, permission denied, or IOE, and we
> >>> will
> >>>>>> need to handle those cases differently.  Also, we will need to
> >>> harden
> >>>>>> the code against disk errors.  Formerly it was OK to just crash on a
> >>>>>> disk error; now it is not.  It would be nice to see more in the test
> >>>>>> plan about injecting IOExceptions into disk handling code and
> >>> verifying
> >>>>>> that we can handle it correctly.
> >>>>>>
> >>>>>> regards,
> >>>>>> Colin
> >>>>>>
> >>>>>>
> >>>>>> On Wed, Feb 1, 2017, at 10:02, Dong Lin wrote:
> >>>>>>> Hey Grant,
> >>>>>>>
> >>>>>>> Yes, this KIP does exactly what you described:)
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> Dong
> >>>>>>>
> >>>>>>> On Wed, Feb 1, 2017 at 9:45 AM, Grant Henke <ghe...@cloudera.com>
> >>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi Dong,
> >>>>>>>>
> >>>>>>>> Thanks for putting this together.
> >>>>>>>>
> >>>>>>>> Since we are discussing alternative/simplified options. Have you
> >>>>> considered
> >>>>>>>> handling the disk failures broker side to prevent a crash,
> >>> marking the
> >>>>> disk
> >>>>>>>> as "bad" to that individual broker, and continuing as normal? I
> >>>>> imagine the
> >>>>>>>> broker would then fall out of sync for the replicas hosted on the
> >>> bad
> >>>>> disk
> >>>>>>>> and the ISR would shrink. This would allow people using min.isr
> >>> to keep
> >>>>>>>> their data safe and the cluster operators would see a shrink in
> >>> many
> >>>>> ISRs
> >>>>>>>> and hopefully an obvious log message leading to a quick fix. I
> >>> haven't
> >>>>>>>> thought through this idea in depth though. So there could be some
> >>>>>>>> shortfalls.
> >>>>>>>>
> >>>>>>>> Thanks,
> >>>>>>>> Grant
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Wed, Feb 1, 2017 at 11:21 AM, Dong Lin <lindon...@gmail.com>
> >>> wrote:
> >>>>>>>>
> >>>>>>>>> Hey Eno,
> >>>>>>>>>
> >>>>>>>>> Thanks much for the review.
> >>>>>>>>>
> >>>>>>>>> I think your suggestion is to split disks of a machine into
> >>> multiple
> >>>>> disk
> >>>>>>>>> sets and run one broker per disk set. Yeah this is similar to
> >>> Colin's
> >>>>>>>>> suggestion of one-broker-per-disk, which we have evaluated at
> >>> LinkedIn
> >>>>>>>> and
> >>>>>>>>> considered it to be a good short term approach.
> >>>>>>>>>
> >>>>>>>>> As of now I don't think any of these approach is a better
> >>> alternative
> >>>>> in
> >>>>>>>>> the long term. I will summarize these here. I have put these
> >>> reasons
> >>>>> in
> >>>>>>>> the
> >>>>>>>>> KIP's motivation section and rejected alternative section. I am
> >>> happy
> >>>>> to
> >>>>>>>>> discuss more and I would certainly like to use an alternative
> >>> solution
> >>>>>>>> that
> >>>>>>>>> is easier to do with better performance.
> >>>>>>>>>
> >>>>>>>>> - JBOD vs. RAID-10: if we switch from RAID-10 with
> >>>>> replication-factoer=2
> >>>>>>>> to
> >>>>>>>>> JBOD with replicatio-factor=3, we get 25% reduction in disk
> >>> usage and
> >>>>>>>>> doubles the tolerance of broker failure before data
> >>> unavailability
> >>>>> from 1
> >>>>>>>>> to 2. This is pretty huge gain for any company that uses Kafka at
> >>>>> large
> >>>>>>>>> scale.
> >>>>>>>>>
> >>>>>>>>> - JBOD vs. one-broker-per-disk: The benefit of
> >>> one-broker-per-disk is
> >>>>>>>> that
> >>>>>>>>> no major code change is needed in Kafka. Among the disadvantage
> >>> of
> >>>>>>>>> one-broker-per-disk summarized in the KIP and previous email with
> >>>>> Colin,
> >>>>>>>>> the biggest one is the 15% throughput loss compared to JBOD and
> >>> less
> >>>>>>>>> flexibility to balance across disks. Further, it probably
> >>> requires
> >>>>> change
> >>>>>>>>> to internal deployment tools at various companies to deal with
> >>>>>>>>> one-broker-per-disk setup.
> >>>>>>>>>
> >>>>>>>>> - JBOD vs. RAID-0: This is the setup that used at Microsoft. The
> >>>>> problem
> >>>>>>>> is
> >>>>>>>>> that a broker becomes unavailable if any disk fail. Suppose
> >>>>>>>>> replication-factor=2 and there are 10 disks per machine. Then the
> >>>>>>>>> probability of of any message becomes unavailable due to disk
> >>> failure
> >>>>>>>> with
> >>>>>>>>> RAID-0 is 100X higher than that with JBOD.
> >>>>>>>>>
> >>>>>>>>> - JBOD vs. one-broker-per-few-disks: one-broker-per-few-disk is
> >>>>> somewhere
> >>>>>>>>> between one-broker-per-disk and RAID-0. So it carries an averaged
> >>>>>>>>> disadvantages of these two approaches.
> >>>>>>>>>
> >>>>>>>>> To answer your question regarding, I think it is reasonable to
> >>> mange
> >>>>> disk
> >>>>>>>>> in Kafka. By "managing disks" we mean the management of
> >>> assignment of
> >>>>>>>>> replicas across disks. Here are my reasons in more detail:
> >>>>>>>>>
> >>>>>>>>> - I don't think this KIP is a big step change. By allowing user
> >>> to
> >>>>>>>>> configure Kafka to run multiple log directories or disks as of
> >>> now,
> >>>>> it is
> >>>>>>>>> implicit that Kafka manages disks. It is just not a complete
> >>> feature.
> >>>>>>>>> Microsoft and probably other companies are using this feature
> >>> under
> >>>>> the
> >>>>>>>>> undesirable effect that a broker will fail any if any disk fail.
> >>> It is
> >>>>>>>> good
> >>>>>>>>> to complete this feature.
> >>>>>>>>>
> >>>>>>>>> - I think it is reasonable to manage disk in Kafka. One of the
> >>> most
> >>>>>>>>> important work that Kafka is doing is to determine the replica
> >>>>> assignment
> >>>>>>>>> across brokers and make sure enough copies of a given replica is
> >>>>>>>> available.
> >>>>>>>>> I would argue that it is not much different than determining the
> >>>>> replica
> >>>>>>>>> assignment across disk conceptually.
> >>>>>>>>>
> >>>>>>>>> - I would agree that this KIP is improve performance of Kafka at
> >>> the
> >>>>> cost
> >>>>>>>>> of more complexity inside Kafka, by switching from RAID-10 to
> >>> JBOD. I
> >>>>>>>> would
> >>>>>>>>> argue that this is a right direction. If we can gain 20%+
> >>> performance
> >>>>> by
> >>>>>>>>> managing NIC in Kafka as compared to existing approach and other
> >>>>>>>>> alternatives, I would say we should just do it. Such a gain in
> >>>>>>>> performance,
> >>>>>>>>> or equivalently reduction in cost, can save millions of dollars
> >>> per
> >>>>> year
> >>>>>>>>> for any company running Kafka at large scale.
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>> Dong
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Wed, Feb 1, 2017 at 5:41 AM, Eno Thereska <
> >>> eno.there...@gmail.com>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> I'm coming somewhat late to the discussion, apologies for that.
> >>>>>>>>>>
> >>>>>>>>>> I'm worried about this proposal. It's moving Kafka to a world
> >>> where
> >>>>> it
> >>>>>>>>>> manages disks. So in a sense, the scope of the KIP is limited,
> >>> but
> >>>>> the
> >>>>>>>>>> direction it sets for Kafka is quite a big step change.
> >>> Fundamentally
> >>>>>>>>> this
> >>>>>>>>>> is about balancing resources for a Kafka broker. This can be
> >>> done by
> >>>>> a
> >>>>>>>>>> tool, rather than by changing Kafka. E.g., the tool would take a
> >>>>> bunch
> >>>>>>>> of
> >>>>>>>>>> disks together, create a volume over them and export that to a
> >>> Kafka
> >>>>>>>>> broker
> >>>>>>>>>> (in addition to setting the memory limits for that broker or
> >>> limiting
> >>>>>>>>> other
> >>>>>>>>>> resources). A different bunch of disks can then make up a second
> >>>>>>>> volume,
> >>>>>>>>>> and be used by another Kafka broker. This is aligned with what
> >>> Colin
> >>>>> is
> >>>>>>>>>> saying (as I understand it).
> >>>>>>>>>>
> >>>>>>>>>> Disks are not the only resource on a machine, there are several
> >>>>>>>> instances
> >>>>>>>>>> where multiple NICs are used for example. Do we want fine
> >>> grained
> >>>>>>>>>> management of all these resources? I'd argue that opens us the
> >>> system
> >>>>>>>> to
> >>>>>>>>> a
> >>>>>>>>>> lot of complexity.
> >>>>>>>>>>
> >>>>>>>>>> Thanks
> >>>>>>>>>> Eno
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>> On 1 Feb 2017, at 01:53, Dong Lin <lindon...@gmail.com> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> Hi all,
> >>>>>>>>>>>
> >>>>>>>>>>> I am going to initiate the vote If there is no further concern
> >>> with
> >>>>>>>> the
> >>>>>>>>>> KIP.
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks,
> >>>>>>>>>>> Dong
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On Fri, Jan 27, 2017 at 8:08 PM, radai <
> >>> radai.rosenbl...@gmail.com>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> a few extra points:
> >>>>>>>>>>>>
> >>>>>>>>>>>> 1. broker per disk might also incur more client <--> broker
> >>>>> sockets:
> >>>>>>>>>>>> suppose every producer / consumer "talks" to >1 partition,
> >>> there's
> >>>>> a
> >>>>>>>>>> very
> >>>>>>>>>>>> good chance that partitions that were co-located on a single
> >>>>> 10-disk
> >>>>>>>>>> broker
> >>>>>>>>>>>> would now be split between several single-disk broker
> >>> processes on
> >>>>>>>> the
> >>>>>>>>>> same
> >>>>>>>>>>>> machine. hard to put a multiplier on this, but likely >x1.
> >>> sockets
> >>>>>>>>> are a
> >>>>>>>>>>>> limited resource at the OS level and incur some memory cost
> >>> (kernel
> >>>>>>>>>>>> buffers)
> >>>>>>>>>>>>
> >>>>>>>>>>>> 2. there's a memory overhead to spinning up a JVM (compiled
> >>> code
> >>>>> and
> >>>>>>>>>> byte
> >>>>>>>>>>>> code objects etc). if we assume this overhead is ~300 MB
> >>> (order of
> >>>>>>>>>>>> magnitude, specifics vary) than spinning up 10 JVMs would
> >>> lose you
> >>>>> 3
> >>>>>>>>> GB
> >>>>>>>>>> of
> >>>>>>>>>>>> RAM. not a ton, but non negligible.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 3. there would also be some overhead downstream of kafka in
> >>> any
> >>>>>>>>>> management
> >>>>>>>>>>>> / monitoring / log aggregation system. likely less than x10
> >>> though.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 4. (related to above) - added complexity of administration
> >>> with
> >>>>> more
> >>>>>>>>>>>> running instances.
> >>>>>>>>>>>>
> >>>>>>>>>>>> is anyone running kafka with anywhere near 100GB heaps? i
> >>> thought
> >>>>>>>> the
> >>>>>>>>>> point
> >>>>>>>>>>>> was to rely on kernel page cache to do the disk buffering ....
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Thu, Jan 26, 2017 at 11:00 AM, Dong Lin <
> >>> lindon...@gmail.com>
> >>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Hey Colin,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks much for the comment. Please see me comment inline.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Thu, Jan 26, 2017 at 10:15 AM, Colin McCabe <
> >>>>> cmcc...@apache.org
> >>>>>>>>>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Wed, Jan 25, 2017, at 13:50, Dong Lin wrote:
> >>>>>>>>>>>>>>> Hey Colin,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Good point! Yeah we have actually considered and tested
> >>> this
> >>>>>>>>>>>> solution,
> >>>>>>>>>>>>>>> which we call one-broker-per-disk. It would work and should
> >>>>>>>> require
> >>>>>>>>>>>> no
> >>>>>>>>>>>>>>> major change in Kafka as compared to this JBOD KIP. So it
> >>> would
> >>>>>>>> be
> >>>>>>>>> a
> >>>>>>>>>>>>> good
> >>>>>>>>>>>>>>> short term solution.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> But it has a few drawbacks which makes it less desirable
> >>> in the
> >>>>>>>>> long
> >>>>>>>>>>>>>>> term.
> >>>>>>>>>>>>>>> Assume we have 10 disks on a machine. Here are the
> >>> problems:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hi Dong,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks for the thoughtful reply.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 1) Our stress test result shows that one-broker-per-disk
> >>> has 15%
> >>>>>>>>>>>> lower
> >>>>>>>>>>>>>>> throughput
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 2) Controller would need to send 10X as many
> >>>>> LeaderAndIsrRequest,
> >>>>>>>>>>>>>>> MetadataUpdateRequest and StopReplicaRequest. This
> >>> increases the
> >>>>>>>>>>>> burden
> >>>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>> controller which can be the performance bottleneck.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Maybe I'm misunderstanding something, but there would not
> >>> be 10x
> >>>>>>>> as
> >>>>>>>>>>>> many
> >>>>>>>>>>>>>> StopReplicaRequest RPCs, would there?  The other requests
> >>> would
> >>>>>>>>>>>> increase
> >>>>>>>>>>>>>> 10x, but from a pretty low base, right?  We are not
> >>> reassigning
> >>>>>>>>>>>>>> partitions all the time, I hope (or else we have bigger
> >>>>>>>> problems...)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I think the controller will group StopReplicaRequest per
> >>> broker
> >>>>> and
> >>>>>>>>>> send
> >>>>>>>>>>>>> only one StopReplicaRequest to a broker during controlled
> >>>>> shutdown.
> >>>>>>>>>>>> Anyway,
> >>>>>>>>>>>>> we don't have to worry about this if we agree that other
> >>> requests
> >>>>>>>>> will
> >>>>>>>>>>>>> increase by 10X. One MetadataRequest to send to each broker
> >>> in the
> >>>>>>>>>>>> cluster
> >>>>>>>>>>>>> every time there is leadership change. I am not sure this is
> >>> a
> >>>>> real
> >>>>>>>>>>>>> problem. But in theory this makes the overhead complexity
> >>> O(number
> >>>>>>>> of
> >>>>>>>>>>>>> broker) and may be a concern in the future. Ideally we should
> >>>>> avoid
> >>>>>>>>> it.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 3) Less efficient use of physical resource on the machine.
> >>> The
> >>>>>>>>> number
> >>>>>>>>>>>>> of
> >>>>>>>>>>>>>>> socket on each machine will increase by 10X. The number of
> >>>>>>>>> connection
> >>>>>>>>>>>>>>> between any two machine will increase by 100X.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 4) Less efficient way to management memory and quota.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 5) Rebalance between disks/brokers on the same machine
> >>> will less
> >>>>>>>>>>>>>>> efficient
> >>>>>>>>>>>>>>> and less flexible. Broker has to read data from another
> >>> broker
> >>>>> on
> >>>>>>>>> the
> >>>>>>>>>>>>>>> same
> >>>>>>>>>>>>>>> machine via socket. It is also harder to do automatic load
> >>>>>>>> balance
> >>>>>>>>>>>>>>> between
> >>>>>>>>>>>>>>> disks on the same machine in the future.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I will put this and the explanation in the rejected
> >>> alternative
> >>>>>>>>>>>>> section.
> >>>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>> have a few questions:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> - Can you explain why this solution can help avoid
> >>> scalability
> >>>>>>>>>>>>>>> bottleneck?
> >>>>>>>>>>>>>>> I actually think it will exacerbate the scalability
> >>> problem due
> >>>>>>>> the
> >>>>>>>>>>>> 2)
> >>>>>>>>>>>>>>> above.
> >>>>>>>>>>>>>>> - Why can we push more RPC with this solution?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> To really answer this question we'd have to take a deep
> >>> dive into
> >>>>>>>>> the
> >>>>>>>>>>>>>> locking of the broker and figure out how effectively it can
> >>>>>>>>>> parallelize
> >>>>>>>>>>>>>> truly independent requests.  Almost every multithreaded
> >>> process
> >>>>> is
> >>>>>>>>>>>> going
> >>>>>>>>>>>>>> to have shared state, like shared queues or shared sockets,
> >>> that
> >>>>>>>> is
> >>>>>>>>>>>>>> going to make scaling less than linear when you add disks or
> >>>>>>>>>>>> processors.
> >>>>>>>>>>>>>> (And clearly, another option is to improve that scalability,
> >>>>>>>> rather
> >>>>>>>>>>>>>> than going multi-process!)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Yeah I also think it is better to improve scalability inside
> >>> kafka
> >>>>>>>>> code
> >>>>>>>>>>>> if
> >>>>>>>>>>>>> possible. I am not sure we currently have any scalability
> >>> issue
> >>>>>>>>> inside
> >>>>>>>>>>>>> Kafka that can not be removed without using multi-process.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> - It is true that a garbage collection in one broker would
> >>> not
> >>>>>>>>> affect
> >>>>>>>>>>>>>>> others. But that is after every broker only uses 1/10 of
> >>> the
> >>>>>>>>> memory.
> >>>>>>>>>>>>> Can
> >>>>>>>>>>>>>>> we be sure that this will actually help performance?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> The big question is, how much memory do Kafka brokers use
> >>> now,
> >>>>> and
> >>>>>>>>> how
> >>>>>>>>>>>>>> much will they use in the future?  Our experience in HDFS
> >>> was
> >>>>> that
> >>>>>>>>>> once
> >>>>>>>>>>>>>> you start getting more than 100-200GB Java heap sizes, full
> >>> GCs
> >>>>>>>>> start
> >>>>>>>>>>>>>> taking minutes to finish when using the standard JVMs.  That
> >>>>> alone
> >>>>>>>>> is
> >>>>>>>>>> a
> >>>>>>>>>>>>>> good reason to go multi-process or consider storing more
> >>> things
> >>>>>>>> off
> >>>>>>>>>> the
> >>>>>>>>>>>>>> Java heap.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I see. Now I agree one-broker-per-disk should be more
> >>> efficient in
> >>>>>>>>>> terms
> >>>>>>>>>>>> of
> >>>>>>>>>>>>> GC since each broker probably needs less than 1/10 of the
> >>> memory
> >>>>>>>>>>>> available
> >>>>>>>>>>>>> on a typical machine nowadays. I will remove this from the
> >>> reason
> >>>>>>>> of
> >>>>>>>>>>>>> rejection.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Disk failure is the "easy" case.  The "hard" case, which is
> >>>>>>>>>>>>>> unfortunately also the much more common case, is disk
> >>>>> misbehavior.
> >>>>>>>>>>>>>> Towards the end of their lives, disks tend to start slowing
> >>> down
> >>>>>>>>>>>>>> unpredictably.  Requests that would have completed
> >>> immediately
> >>>>>>>>> before
> >>>>>>>>>>>>>> start taking 20, 100 500 milliseconds.  Some files may be
> >>>>> readable
> >>>>>>>>> and
> >>>>>>>>>>>>>> other files may not be.  System calls hang, sometimes
> >>> forever,
> >>>>> and
> >>>>>>>>> the
> >>>>>>>>>>>>>> Java process can't abort them, because the hang is in the
> >>> kernel.
> >>>>>>>>> It
> >>>>>>>>>>>> is
> >>>>>>>>>>>>>> not fun when threads are stuck in "D state"
> >>>>>>>>>>>>>> http://stackoverflow.com/quest
> >>> ions/20423521/process-perminan
> >>>>>>>>>>>>>> tly-stuck-on-d-state
> >>>>>>>>>>>>>> .  Even kill -9 cannot abort the thread then.  Fortunately,
> >>> this
> >>>>>>>> is
> >>>>>>>>>>>>>> rare.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I agree it is a harder problem and it is rare. We probably
> >>> don't
> >>>>>>>> have
> >>>>>>>>>> to
> >>>>>>>>>>>>> worry about it in this KIP since this issue is orthogonal to
> >>>>>>>> whether
> >>>>>>>>> or
> >>>>>>>>>>>> not
> >>>>>>>>>>>>> we use JBOD.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Another approach we should consider is for Kafka to
> >>> implement its
> >>>>>>>>> own
> >>>>>>>>>>>>>> storage layer that would stripe across multiple disks.  This
> >>>>>>>>> wouldn't
> >>>>>>>>>>>>>> have to be done at the block level, but could be done at
> >>> the file
> >>>>>>>>>>>> level.
> >>>>>>>>>>>>>> We could use consistent hashing to determine which disks a
> >>> file
> >>>>>>>>> should
> >>>>>>>>>>>>>> end up on, for example.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Are you suggesting that we should distribute log, or log
> >>> segment,
> >>>>>>>>>> across
> >>>>>>>>>>>>> disks of brokers? I am not sure if I fully understand this
> >>>>>>>> approach.
> >>>>>>>>> My
> >>>>>>>>>>>> gut
> >>>>>>>>>>>>> feel is that this would be a drastic solution that would
> >>> require
> >>>>>>>>>>>>> non-trivial design. While this may be useful to Kafka, I
> >>> would
> >>>>>>>> prefer
> >>>>>>>>>> not
> >>>>>>>>>>>>> to discuss this in detail in this thread unless you believe
> >>> it is
> >>>>>>>>>>>> strictly
> >>>>>>>>>>>>> superior to the design in this KIP in terms of solving our
> >>>>>>>> use-case.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> best,
> >>>>>>>>>>>>>> Colin
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>> Dong
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Wed, Jan 25, 2017 at 11:34 AM, Colin McCabe <
> >>>>>>>> cmcc...@apache.org
> >>>>>>>>>>
> >>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Hi Dong,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Thanks for the writeup!  It's very interesting.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I apologize in advance if this has been discussed
> >>> somewhere
> >>>>>>>> else.
> >>>>>>>>>>>>> But
> >>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>> am curious if you have considered the solution of running
> >>>>>>>> multiple
> >>>>>>>>>>>>>>>> brokers per node.  Clearly there is a memory overhead
> >>> with this
> >>>>>>>>>>>>>> solution
> >>>>>>>>>>>>>>>> because of the fixed cost of starting multiple JVMs.
> >>> However,
> >>>>>>>>>>>>> running
> >>>>>>>>>>>>>>>> multiple JVMs would help avoid scalability bottlenecks.
> >>> You
> >>>>>>>> could
> >>>>>>>>>>>>>>>> probably push more RPCs per second, for example.  A
> >>> garbage
> >>>>>>>>>>>>> collection
> >>>>>>>>>>>>>>>> in one broker would not affect the others.  It would be
> >>>>>>>>> interesting
> >>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>> see this considered in the "alternate designs" design,
> >>> even if
> >>>>>>>> you
> >>>>>>>>>>>>> end
> >>>>>>>>>>>>>>>> up deciding it's not the way to go.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> best,
> >>>>>>>>>>>>>>>> Colin
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Thu, Jan 12, 2017, at 10:46, Dong Lin wrote:
> >>>>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> We created KIP-112: Handle disk failure for JBOD. Please
> >>> find
> >>>>>>>> the
> >>>>>>>>>>>>> KIP
> >>>>>>>>>>>>>>>>> wiki
> >>>>>>>>>>>>>>>>> in the link https://cwiki.apache.org/confl
> >>>>>>>>>>>> uence/display/KAFKA/KIP-
> >>>>>>>>>>>>>>>>> 112%3A+Handle+disk+failure+for+JBOD.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> This KIP is related to KIP-113
> >>>>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>>>>>>>>>>>>>> 113%3A+Support+replicas+moveme
> >>> nt+between+log+directories>:
> >>>>>>>>>>>>>>>>> Support replicas movement between log directories. They
> >>> are
> >>>>>>>>>>>> needed
> >>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>> order
> >>>>>>>>>>>>>>>>> to support JBOD in Kafka. Please help review the KIP. You
> >>>>>>>>>>>> feedback
> >>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>> appreciated!
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>> Dong
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> --
> >>>>>>>> Grant Henke
> >>>>>>>> Software Engineer | Cloudera
> >>>>>>>> gr...@cloudera.com | twitter.com/gchenke |
> >>> linkedin.com/in/granthenke
> >>>>>>>>
> >>>>>
> >>>>>
> >>>
> >>
> >>
>
>

Reply via email to