Hi all,

Thank you all for the helpful suggestion. I have updated the KIP to address
the comments received so far. See here
<https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=67638402&selectedPageVersions=8&selectedPageVersions=9>to
read the changes of the KIP. Here is a summary of change:

- Updated the Proposed Change section to change the recovery steps. After
this change, broker will also create replica as long as all log directories
are working.
- Removed kafka-log-dirs.sh from this KIP since user no longer needs to use
it for recovery from bad disks.
- Explained how the znode controller_managed_state is managed in the Public
interface section.
- Explained what happens during controller failover, partition reassignment
and topic deletion in the Proposed Change section.
- Updated Future Work section to include the following potential
improvements
  - Let broker notify controller of ISR change and disk state change via
RPC instead of using zookeeper
  - Handle various failure scenarios (e.g. slow disk) on a case-by-case
basis. For example, we may want to detect slow disk and consider it as
offline.
  - Allow admin to mark a directory as bad so that it will not be used.

Thanks,
Dong



On Tue, Feb 7, 2017 at 5:23 PM, Dong Lin <lindon...@gmail.com> wrote:

> Hey Eno,
>
> Thanks much for the comment!
>
> I still think the complexity added to Kafka is justified by its benefit.
> Let me provide my reasons below.
>
> 1) The additional logic is easy to understand and thus its complexity
> should be reasonable.
>
> On the broker side, it needs to catch exception when access log directory,
> mark log directory and all its replicas as offline, notify controller by
> writing the zookeeper notification path, and specify error in
> LeaderAndIsrResponse. On the controller side, it will listener to
> zookeeper for disk failure notification, learn about offline replicas in
> the LeaderAndIsrResponse, and take offline replicas into consideration when
> electing leaders. It also mark replica as created in zookeeper and use it
> to determine whether a replica is created.
>
> That is all the logic we need to add in Kafka. I personally feel this is
> easy to reason about.
>
> 2) The additional code is not much.
>
> I expect the code for KIP-112 to be around 1100 lines new code. Previously
> I have implemented a prototype of a slightly different design (see here
> <https://docs.google.com/document/d/1Izza0SBmZMVUBUt9s_-Dqi3D8e0KGJQYW8xgEdRsgAI/edit>)
> and uploaded it to github (see here
> <https://github.com/lindong28/kafka/tree/JBOD>). The patch changed 33
> files, added 1185 lines and deleted 183 lines. The size of prototype patch
> is actually smaller than patch of KIP-107 (see here
> <https://github.com/apache/kafka/pull/2476>) which is already accepted.
> The KIP-107 patch changed 49 files, added 1349 lines and deleted 141 lines.
>
> 3) Comparison with one-broker-per-multiple-volumes
>
> This KIP can improve the availability of Kafka in this case such that one
> failed volume doesn't bring down the entire broker.
>
> 4) Comparison with one-broker-per-volume
>
> If each volume maps to multiple disks, then we still have similar problem
> such that the broker will fail if any disk of the volume failed.
>
> If each volume maps to one disk, it means that we need to deploy 10
> brokers on a machine if the machine has 10 disks. I will explain the
> concern with this approach in order of their importance.
>
> - It is weird if we were to tell kafka user to deploy 50 brokers on a
> machine of 50 disks.
>
> - Either when user deploys Kafka on a commercial cloud platform or when
> user deploys their own cluster, the size or largest disk is usually
> limited. There will be scenarios where user want to increase broker
> capacity by having multiple disks per broker. This JBOD KIP makes it
> feasible without hurting availability due to single disk failure.
>
> - Automatic load rebalance across disks will be easier and more flexible
> if one broker has multiple disks. This can be future work.
>
> - There is performance concern when you deploy 10 broker vs. 1 broker on
> one machine. The metadata the cluster, including FetchRequest,
> ProduceResponse, MetadataRequest and so on will all be 10X more. The
> packet-per-second will be 10X higher which may limit performance if pps is
> the performance bottleneck. The number of socket on the machine is 10X
> higher. And the number of replication thread will be 100X more. The impact
> will be more significant with increasing number of disks per machine. Thus
> it will limit Kakfa's scalability in the long term.
>
> Thanks,
> Dong
>
>
> On Tue, Feb 7, 2017 at 1:51 AM, Eno Thereska <eno.there...@gmail.com>
> wrote:
>
>> Hi Dong,
>>
>> To simplify the discussion today, on my part I'll zoom into one thing
>> only:
>>
>> - I'll discuss the options called below : "one-broker-per-disk" or
>> "one-broker-per-few-disks".
>>
>> - I completely buy the JBOD vs RAID arguments so there is no need to
>> discuss that part for me. I buy it that JBODs are good.
>>
>> I find the terminology can be improved a bit. Ideally we'd be talking
>> about volumes, not disks. Just to make it clear that Kafka understand
>> volumes/directories, not individual raw disks. So by
>> "one-broker-per-few-disks" what I mean is that the admin can pool a few
>> disks together to create a volume/directory and give that to Kafka.
>>
>>
>> The kernel of my question will be that the admin already has tools to 1)
>> create volumes/directories from a JBOD and 2) start a broker on a desired
>> machine and 3) assign a broker resources like a directory. I claim that
>> those tools are sufficient to optimise resource allocation.  I understand
>> that a broker could manage point 3) itself, ie juggle the directories. My
>> question is whether the complexity added to Kafka is justified.
>> Operationally it seems to me an admin will still have to do all the three
>> items above.
>>
>> Looking forward to the discussion
>> Thanks
>> Eno
>>
>>
>> > On 1 Feb 2017, at 17:21, Dong Lin <lindon...@gmail.com> wrote:
>> >
>> > Hey Eno,
>> >
>> > Thanks much for the review.
>> >
>> > I think your suggestion is to split disks of a machine into multiple
>> disk
>> > sets and run one broker per disk set. Yeah this is similar to Colin's
>> > suggestion of one-broker-per-disk, which we have evaluated at LinkedIn
>> and
>> > considered it to be a good short term approach.
>> >
>> > As of now I don't think any of these approach is a better alternative in
>> > the long term. I will summarize these here. I have put these reasons in
>> the
>> > KIP's motivation section and rejected alternative section. I am happy to
>> > discuss more and I would certainly like to use an alternative solution
>> that
>> > is easier to do with better performance.
>> >
>> > - JBOD vs. RAID-10: if we switch from RAID-10 with
>> replication-factoer=2 to
>> > JBOD with replicatio-factor=3, we get 25% reduction in disk usage and
>> > doubles the tolerance of broker failure before data unavailability from
>> 1
>> > to 2. This is pretty huge gain for any company that uses Kafka at large
>> > scale.
>> >
>> > - JBOD vs. one-broker-per-disk: The benefit of one-broker-per-disk is
>> that
>> > no major code change is needed in Kafka. Among the disadvantage of
>> > one-broker-per-disk summarized in the KIP and previous email with Colin,
>> > the biggest one is the 15% throughput loss compared to JBOD and less
>> > flexibility to balance across disks. Further, it probably requires
>> change
>> > to internal deployment tools at various companies to deal with
>> > one-broker-per-disk setup.
>> >
>> > - JBOD vs. RAID-0: This is the setup that used at Microsoft. The
>> problem is
>> > that a broker becomes unavailable if any disk fail. Suppose
>> > replication-factor=2 and there are 10 disks per machine. Then the
>> > probability of of any message becomes unavailable due to disk failure
>> with
>> > RAID-0 is 100X higher than that with JBOD.
>> >
>> > - JBOD vs. one-broker-per-few-disks: one-broker-per-few-disk is
>> somewhere
>> > between one-broker-per-disk and RAID-0. So it carries an averaged
>> > disadvantages of these two approaches.
>> >
>> > To answer your question regarding, I think it is reasonable to mange
>> disk
>> > in Kafka. By "managing disks" we mean the management of assignment of
>> > replicas across disks. Here are my reasons in more detail:
>> >
>> > - I don't think this KIP is a big step change. By allowing user to
>> > configure Kafka to run multiple log directories or disks as of now, it
>> is
>> > implicit that Kafka manages disks. It is just not a complete feature.
>> > Microsoft and probably other companies are using this feature under the
>> > undesirable effect that a broker will fail any if any disk fail. It is
>> good
>> > to complete this feature.
>> >
>> > - I think it is reasonable to manage disk in Kafka. One of the most
>> > important work that Kafka is doing is to determine the replica
>> assignment
>> > across brokers and make sure enough copies of a given replica is
>> available.
>> > I would argue that it is not much different than determining the replica
>> > assignment across disk conceptually.
>> >
>> > - I would agree that this KIP is improve performance of Kafka at the
>> cost
>> > of more complexity inside Kafka, by switching from RAID-10 to JBOD. I
>> would
>> > argue that this is a right direction. If we can gain 20%+ performance by
>> > managing NIC in Kafka as compared to existing approach and other
>> > alternatives, I would say we should just do it. Such a gain in
>> performance,
>> > or equivalently reduction in cost, can save millions of dollars per year
>> > for any company running Kafka at large scale.
>> >
>> > Thanks,
>> > Dong
>> >
>> >
>> > On Wed, Feb 1, 2017 at 5:41 AM, Eno Thereska <eno.there...@gmail.com>
>> wrote:
>> >
>> >> I'm coming somewhat late to the discussion, apologies for that.
>> >>
>> >> I'm worried about this proposal. It's moving Kafka to a world where it
>> >> manages disks. So in a sense, the scope of the KIP is limited, but the
>> >> direction it sets for Kafka is quite a big step change. Fundamentally
>> this
>> >> is about balancing resources for a Kafka broker. This can be done by a
>> >> tool, rather than by changing Kafka. E.g., the tool would take a bunch
>> of
>> >> disks together, create a volume over them and export that to a Kafka
>> broker
>> >> (in addition to setting the memory limits for that broker or limiting
>> other
>> >> resources). A different bunch of disks can then make up a second
>> volume,
>> >> and be used by another Kafka broker. This is aligned with what Colin is
>> >> saying (as I understand it).
>> >>
>> >> Disks are not the only resource on a machine, there are several
>> instances
>> >> where multiple NICs are used for example. Do we want fine grained
>> >> management of all these resources? I'd argue that opens us the system
>> to a
>> >> lot of complexity.
>> >>
>> >> Thanks
>> >> Eno
>> >>
>> >>
>> >>> On 1 Feb 2017, at 01:53, Dong Lin <lindon...@gmail.com> wrote:
>> >>>
>> >>> Hi all,
>> >>>
>> >>> I am going to initiate the vote If there is no further concern with
>> the
>> >> KIP.
>> >>>
>> >>> Thanks,
>> >>> Dong
>> >>>
>> >>>
>> >>> On Fri, Jan 27, 2017 at 8:08 PM, radai <radai.rosenbl...@gmail.com>
>> >> wrote:
>> >>>
>> >>>> a few extra points:
>> >>>>
>> >>>> 1. broker per disk might also incur more client <--> broker sockets:
>> >>>> suppose every producer / consumer "talks" to >1 partition, there's a
>> >> very
>> >>>> good chance that partitions that were co-located on a single 10-disk
>> >> broker
>> >>>> would now be split between several single-disk broker processes on
>> the
>> >> same
>> >>>> machine. hard to put a multiplier on this, but likely >x1. sockets
>> are a
>> >>>> limited resource at the OS level and incur some memory cost (kernel
>> >>>> buffers)
>> >>>>
>> >>>> 2. there's a memory overhead to spinning up a JVM (compiled code and
>> >> byte
>> >>>> code objects etc). if we assume this overhead is ~300 MB (order of
>> >>>> magnitude, specifics vary) than spinning up 10 JVMs would lose you 3
>> GB
>> >> of
>> >>>> RAM. not a ton, but non negligible.
>> >>>>
>> >>>> 3. there would also be some overhead downstream of kafka in any
>> >> management
>> >>>> / monitoring / log aggregation system. likely less than x10 though.
>> >>>>
>> >>>> 4. (related to above) - added complexity of administration with more
>> >>>> running instances.
>> >>>>
>> >>>> is anyone running kafka with anywhere near 100GB heaps? i thought the
>> >> point
>> >>>> was to rely on kernel page cache to do the disk buffering ....
>> >>>>
>> >>>> On Thu, Jan 26, 2017 at 11:00 AM, Dong Lin <lindon...@gmail.com>
>> wrote:
>> >>>>
>> >>>>> Hey Colin,
>> >>>>>
>> >>>>> Thanks much for the comment. Please see me comment inline.
>> >>>>>
>> >>>>> On Thu, Jan 26, 2017 at 10:15 AM, Colin McCabe <cmcc...@apache.org>
>> >>>> wrote:
>> >>>>>
>> >>>>>> On Wed, Jan 25, 2017, at 13:50, Dong Lin wrote:
>> >>>>>>> Hey Colin,
>> >>>>>>>
>> >>>>>>> Good point! Yeah we have actually considered and tested this
>> >>>> solution,
>> >>>>>>> which we call one-broker-per-disk. It would work and should
>> require
>> >>>> no
>> >>>>>>> major change in Kafka as compared to this JBOD KIP. So it would
>> be a
>> >>>>> good
>> >>>>>>> short term solution.
>> >>>>>>>
>> >>>>>>> But it has a few drawbacks which makes it less desirable in the
>> long
>> >>>>>>> term.
>> >>>>>>> Assume we have 10 disks on a machine. Here are the problems:
>> >>>>>>
>> >>>>>> Hi Dong,
>> >>>>>>
>> >>>>>> Thanks for the thoughtful reply.
>> >>>>>>
>> >>>>>>>
>> >>>>>>> 1) Our stress test result shows that one-broker-per-disk has 15%
>> >>>> lower
>> >>>>>>> throughput
>> >>>>>>>
>> >>>>>>> 2) Controller would need to send 10X as many LeaderAndIsrRequest,
>> >>>>>>> MetadataUpdateRequest and StopReplicaRequest. This increases the
>> >>>> burden
>> >>>>>>> on
>> >>>>>>> controller which can be the performance bottleneck.
>> >>>>>>
>> >>>>>> Maybe I'm misunderstanding something, but there would not be 10x as
>> >>>> many
>> >>>>>> StopReplicaRequest RPCs, would there?  The other requests would
>> >>>> increase
>> >>>>>> 10x, but from a pretty low base, right?  We are not reassigning
>> >>>>>> partitions all the time, I hope (or else we have bigger
>> problems...)
>> >>>>>>
>> >>>>>
>> >>>>> I think the controller will group StopReplicaRequest per broker and
>> >> send
>> >>>>> only one StopReplicaRequest to a broker during controlled shutdown.
>> >>>> Anyway,
>> >>>>> we don't have to worry about this if we agree that other requests
>> will
>> >>>>> increase by 10X. One MetadataRequest to send to each broker in the
>> >>>> cluster
>> >>>>> every time there is leadership change. I am not sure this is a real
>> >>>>> problem. But in theory this makes the overhead complexity O(number
>> of
>> >>>>> broker) and may be a concern in the future. Ideally we should avoid
>> it.
>> >>>>>
>> >>>>>
>> >>>>>>
>> >>>>>>>
>> >>>>>>> 3) Less efficient use of physical resource on the machine. The
>> number
>> >>>>> of
>> >>>>>>> socket on each machine will increase by 10X. The number of
>> connection
>> >>>>>>> between any two machine will increase by 100X.
>> >>>>>>>
>> >>>>>>> 4) Less efficient way to management memory and quota.
>> >>>>>>>
>> >>>>>>> 5) Rebalance between disks/brokers on the same machine will less
>> >>>>>>> efficient
>> >>>>>>> and less flexible. Broker has to read data from another broker on
>> the
>> >>>>>>> same
>> >>>>>>> machine via socket. It is also harder to do automatic load balance
>> >>>>>>> between
>> >>>>>>> disks on the same machine in the future.
>> >>>>>>>
>> >>>>>>> I will put this and the explanation in the rejected alternative
>> >>>>> section.
>> >>>>>>> I
>> >>>>>>> have a few questions:
>> >>>>>>>
>> >>>>>>> - Can you explain why this solution can help avoid scalability
>> >>>>>>> bottleneck?
>> >>>>>>> I actually think it will exacerbate the scalability problem due
>> the
>> >>>> 2)
>> >>>>>>> above.
>> >>>>>>> - Why can we push more RPC with this solution?
>> >>>>>>
>> >>>>>> To really answer this question we'd have to take a deep dive into
>> the
>> >>>>>> locking of the broker and figure out how effectively it can
>> >> parallelize
>> >>>>>> truly independent requests.  Almost every multithreaded process is
>> >>>> going
>> >>>>>> to have shared state, like shared queues or shared sockets, that is
>> >>>>>> going to make scaling less than linear when you add disks or
>> >>>> processors.
>> >>>>>> (And clearly, another option is to improve that scalability, rather
>> >>>>>> than going multi-process!)
>> >>>>>>
>> >>>>>
>> >>>>> Yeah I also think it is better to improve scalability inside kafka
>> code
>> >>>> if
>> >>>>> possible. I am not sure we currently have any scalability issue
>> inside
>> >>>>> Kafka that can not be removed without using multi-process.
>> >>>>>
>> >>>>>
>> >>>>>>
>> >>>>>>> - It is true that a garbage collection in one broker would not
>> affect
>> >>>>>>> others. But that is after every broker only uses 1/10 of the
>> memory.
>> >>>>> Can
>> >>>>>>> we be sure that this will actually help performance?
>> >>>>>>
>> >>>>>> The big question is, how much memory do Kafka brokers use now, and
>> how
>> >>>>>> much will they use in the future?  Our experience in HDFS was that
>> >> once
>> >>>>>> you start getting more than 100-200GB Java heap sizes, full GCs
>> start
>> >>>>>> taking minutes to finish when using the standard JVMs.  That alone
>> is
>> >> a
>> >>>>>> good reason to go multi-process or consider storing more things off
>> >> the
>> >>>>>> Java heap.
>> >>>>>>
>> >>>>>
>> >>>>> I see. Now I agree one-broker-per-disk should be more efficient in
>> >> terms
>> >>>> of
>> >>>>> GC since each broker probably needs less than 1/10 of the memory
>> >>>> available
>> >>>>> on a typical machine nowadays. I will remove this from the reason of
>> >>>>> rejection.
>> >>>>>
>> >>>>>
>> >>>>>>
>> >>>>>> Disk failure is the "easy" case.  The "hard" case, which is
>> >>>>>> unfortunately also the much more common case, is disk misbehavior.
>> >>>>>> Towards the end of their lives, disks tend to start slowing down
>> >>>>>> unpredictably.  Requests that would have completed immediately
>> before
>> >>>>>> start taking 20, 100 500 milliseconds.  Some files may be readable
>> and
>> >>>>>> other files may not be.  System calls hang, sometimes forever, and
>> the
>> >>>>>> Java process can't abort them, because the hang is in the kernel.
>> It
>> >>>> is
>> >>>>>> not fun when threads are stuck in "D state"
>> >>>>>> http://stackoverflow.com/questions/20423521/process-perminan
>> >>>>>> tly-stuck-on-d-state
>> >>>>>> .  Even kill -9 cannot abort the thread then.  Fortunately, this is
>> >>>>>> rare.
>> >>>>>>
>> >>>>>
>> >>>>> I agree it is a harder problem and it is rare. We probably don't
>> have
>> >> to
>> >>>>> worry about it in this KIP since this issue is orthogonal to
>> whether or
>> >>>> not
>> >>>>> we use JBOD.
>> >>>>>
>> >>>>>
>> >>>>>>
>> >>>>>> Another approach we should consider is for Kafka to implement its
>> own
>> >>>>>> storage layer that would stripe across multiple disks.  This
>> wouldn't
>> >>>>>> have to be done at the block level, but could be done at the file
>> >>>> level.
>> >>>>>> We could use consistent hashing to determine which disks a file
>> should
>> >>>>>> end up on, for example.
>> >>>>>>
>> >>>>>
>> >>>>> Are you suggesting that we should distribute log, or log segment,
>> >> across
>> >>>>> disks of brokers? I am not sure if I fully understand this
>> approach. My
>> >>>> gut
>> >>>>> feel is that this would be a drastic solution that would require
>> >>>>> non-trivial design. While this may be useful to Kafka, I would
>> prefer
>> >> not
>> >>>>> to discuss this in detail in this thread unless you believe it is
>> >>>> strictly
>> >>>>> superior to the design in this KIP in terms of solving our use-case.
>> >>>>>
>> >>>>>
>> >>>>>> best,
>> >>>>>> Colin
>> >>>>>>
>> >>>>>>>
>> >>>>>>> Thanks,
>> >>>>>>> Dong
>> >>>>>>>
>> >>>>>>> On Wed, Jan 25, 2017 at 11:34 AM, Colin McCabe <
>> cmcc...@apache.org>
>> >>>>>>> wrote:
>> >>>>>>>
>> >>>>>>>> Hi Dong,
>> >>>>>>>>
>> >>>>>>>> Thanks for the writeup!  It's very interesting.
>> >>>>>>>>
>> >>>>>>>> I apologize in advance if this has been discussed somewhere else.
>> >>>>> But
>> >>>>>> I
>> >>>>>>>> am curious if you have considered the solution of running
>> multiple
>> >>>>>>>> brokers per node.  Clearly there is a memory overhead with this
>> >>>>>> solution
>> >>>>>>>> because of the fixed cost of starting multiple JVMs.  However,
>> >>>>> running
>> >>>>>>>> multiple JVMs would help avoid scalability bottlenecks.  You
>> could
>> >>>>>>>> probably push more RPCs per second, for example.  A garbage
>> >>>>> collection
>> >>>>>>>> in one broker would not affect the others.  It would be
>> interesting
>> >>>>> to
>> >>>>>>>> see this considered in the "alternate designs" design, even if
>> you
>> >>>>> end
>> >>>>>>>> up deciding it's not the way to go.
>> >>>>>>>>
>> >>>>>>>> best,
>> >>>>>>>> Colin
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> On Thu, Jan 12, 2017, at 10:46, Dong Lin wrote:
>> >>>>>>>>> Hi all,
>> >>>>>>>>>
>> >>>>>>>>> We created KIP-112: Handle disk failure for JBOD. Please find
>> the
>> >>>>> KIP
>> >>>>>>>>> wiki
>> >>>>>>>>> in the link https://cwiki.apache.org/confl
>> >>>> uence/display/KAFKA/KIP-
>> >>>>>>>>> 112%3A+Handle+disk+failure+for+JBOD.
>> >>>>>>>>>
>> >>>>>>>>> This KIP is related to KIP-113
>> >>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> >>>>>>>> 113%3A+Support+replicas+movement+between+log+directories>:
>> >>>>>>>>> Support replicas movement between log directories. They are
>> >>>> needed
>> >>>>> in
>> >>>>>>>>> order
>> >>>>>>>>> to support JBOD in Kafka. Please help review the KIP. You
>> >>>> feedback
>> >>>>> is
>> >>>>>>>>> appreciated!
>> >>>>>>>>>
>> >>>>>>>>> Thanks,
>> >>>>>>>>> Dong
>> >>>>>>>>
>> >>>>>>
>> >>>>>
>> >>>>
>> >>
>> >>
>>
>>
>

Reply via email to