Sorry for the typo. I mean that before the KIP meeting, please free feel to
provide comment in this email thread so that discussion in the KIP meeting
can be more efficient.

On Wed, Feb 1, 2017 at 6:53 PM, Dong Lin <lindon...@gmail.com> wrote:

> Hey Eno, Colin,
>
> Would you have time next Tuesday morning to discuss the KIP? How about 10
> - 11 am?
>
> To make best use of our time, can you please invite one or more committer
> from Confluent to join the meeting? I hope the KIP can receive one or more
> +1 from committer at Confluent if we have no concern the KIP after the KIP
> meeting.
>
> In the meeting time, please feel free to provide comment in the thread so
> that discussion in the KIP meeting can be more efficient.
>
> Thanks,
> Dong
>
> On Wed, Feb 1, 2017 at 5:43 PM, Dong Lin <lindon...@gmail.com> wrote:
>
>> Hey Colin,
>>
>> Thanks much for the comment. Please see my reply inline.
>>
>> On Wed, Feb 1, 2017 at 1:54 PM, Colin McCabe <cmcc...@apache.org> wrote:
>>
>>> On Wed, Feb 1, 2017, at 11:35, Dong Lin wrote:
>>> > Hey Grant, Colin,
>>> >
>>> > My bad, I misunderstood Grant's suggestion initially. Indeed this is a
>>> > very
>>> > interesting idea to just wait for replica.max.lag.ms for the replica
>>> on
>>> > the
>>> > bad disk to drop out of ISR instead of having broker actively reporting
>>> > this to the controller.
>>> >
>>> > I have several concerns with this approach.
>>> >
>>> > - Broker needs to maintain persist the information of all partitions
>>> that
>>> > it has created, in a file in each disk. This is needed for broker to
>>> know
>>> > the replicas already created on the bad disks that it can not access.
>>> If
>>> > we
>>> > don't do it, then controller sends LeaderAndIsrRequest to a broker to
>>> > become follower for a partition on the bad disk, the broker will create
>>> > partition on a good disk. The good disks may be overloaded as cascading
>>> > effect.
>>> >
>>> > While it is possible to let broker keep track of the replicas that it
>>> has
>>> > created, I think it is less clean than the approach in the current KIP
>>> > for
>>> > reason provided in the rejective alternative section.
>>> >
>>> > - Change is needed in the controller logic to handle failure to make a
>>> > broker as leader when controller receives LeaderAndIsrResponse.
>>> > Otherwise,
>>> > things go wrong if partition on the bad disk is requested to become
>>> > leader.
>>> > As of now, broker doesn't handle error in LeaderAndIsrResponse.
>>> >
>>> > - We still need tools and mechanism for administrator to know whether a
>>> > replica is offline due to bad disk. I am worried that asking
>>> > administrator
>>> > to log into a machine and get this information in the log is not
>>> scalable
>>> > when the broker number is large. Although each company can develop
>>> their
>>> > internal tools to get this information, it is a waste of developer time
>>> > to
>>> > reinvent the wheel. Reading this information in the log also seems less
>>> > reliable then getting it from Kafka request/response.
>>> >
>>> > I guess the goal of this alternative approach is to avoid making major
>>> > change in Kafka at the cost of increased disk failure discovery time
>>> etc.
>>> > But I think the changes required for fixing the problems above won't be
>>> > much less.
>>>
>>> Thanks for the thoughtful replies, Dong L.
>>>
>>> Instead of having an "offline" state, how about having a "creating"
>>> state for replicas and a "created" state?  Then if a replica was not
>>> accessible on any disk, but still in "created" state, the broker could
>>> know that something had gone wrong.  This also would catch issues like
>>> the broker being started without all log directories configured, or
>>> disks not being correctly mounted at the expected mount points, leading
>>> to empty directories.
>>>
>>
>> Indeed, we need to have an additional state per replica to solve this
>> problem. The current KIP design addresses the problem by putting the
>> "created" state in zookeeper, as you can see in the public interface change
>> of the KIP. Are you suggesting to solve the problem by storing this
>> information in local disk of the broker instead of zookeeper? I have two
>> concerns with this approach:
>>
>> - It requires broker to keep track of the replicas it has created. This
>> solution will split the task of determining offline replicas among
>> controller and brokers as opposed to the current Kafka design, where the
>> controller determines states of replicas and propagate this information to
>> brokers. We think it is less error-prone to still let controller be the
>> only entity that maintains metadata (e.g. replica state) of Kafka cluster.
>>
>> - If we store this information in local disk, then we need to have
>> additional request/response protocol in order to request broker to reset
>> this information, e.g. after a bad disk is replaced with good disk, so that
>> the replica can be re-created on a good disk. Things would be easier if we
>> store this information in zookeeper.
>>
>>
>>>
>>> >
>>> > To answer Colin's questions:
>>> >
>>> > - There is no action required on the side of administrator in case of
>>> log
>>> > directory failure.
>>> >
>>> > - Broker itself is going to discover log directory failure and declare
>>> > offline replicas. Broker doesn't explicitly declare log directory
>>> > failure.
>>> > But administrator can learn from the MetadataResponse that replica is
>>> > offline due to disk failure, i.e. if replica is offline but broker is
>>> > online.
>>>
>>> Can you expand on this a little bit?  It sounds like you are considering
>>> dealing with failures on a replica-by-replica basis, rather than a
>>> disk-by-disk basis.  But it's disks that fail, not really individual
>>> files or directories on disks.  This decision interacts poorly with the
>>> lack of a periodic scanner.  It's easy to imagine a scenario where an
>>> infrequently used replica sits on a dead disk for a long time without us
>>> declaring it dead.
>>>
>>
>> Sure. The broker will fail deal with failures on a disk-by-disk basis.
>> All replicas on a disk to be failed if there is any disk related exception
>> when broker accesses that disk. It means that if any replica on that broker
>> can not be read, then all replicas on that disk are considered offline.
>> Since controller doesn't know the disk names of replicas, it has to learn
>> the liveness of replicas on a replica-by-replica basis in order to do
>> leader election.
>>
>> Besides, I don't think we will have problem with the scenario you
>> described. If a replica is indeed not touched for a long time, then it
>> doesn't matter whether it is considered dead or not. The moment it is
>> needed, either for read or for write, the KIP makes sure that we know its
>> state and make leader election accordingly.
>>
>>
>>> >
>>> > - This KIP does not handle cases where a few disks on a broker are
>>> full,
>>> > but the others have space. If a disk is full and can not be written
>>> then
>>> > the disk is considered to have failed. The imbalance across disks is an
>>> > existing problem and will be handled in KIP-113.
>>>
>>> OK.
>>>
>>> >
>>> > - This KIP does not do a disk scanner that will periodically check for
>>> > error conditions. It doesn't handle any performance degradation of
>>> disks.
>>> > We wait for a failure to happen before declaring a disk bad.
>>> >
>>> > Yes, this KIP requires us to fix cases in the code where we are
>>> > suppressing
>>> > disk errors or ignoring their root cause. But decision of which
>>> Exception
>>> > should be considered disk failure and how to handle each of these are
>>> > more
>>> > like implementation detail. I hope we can focus on the detail and high
>>> > level idea of this KIP and only worry about specific exception when the
>>> > patch is being reviewed.
>>>
>>> Hmm... I think we should discuss how we are going to harden the code
>>> against disk failures, and verify that it has been hardened.  Or maybe
>>> we could do this in a follow-up KIP.
>>>
>>
>> By "harden the code against disk errors" do you mean that we should make
>> a full list of disk-related exception we may see and decide if we should
>> treat each of these differently? I agree it is useful in the long term. But
>> this is actually out of the scope of this KIP. The KIP will re-use the
>> existing KafkaStorageException without having to change what exceptions are
>> considered KafkaStorageException. The goal is to fail replicas on a disk
>> instead of crashing the broker when KafkaStorageException is observed.
>>
>> Thanks,
>> Dong
>>
>>
>>> > After all we probably only know the list of
>>> > exceptions and ways to handle them when we start to implement the KIP.
>>> > And
>>> > we need to improve this list over time as we discover various failure
>>> in
>>> > the deployment.
>>> >
>>> >
>>> > Hey Eno,
>>> >
>>> > Sure thing. Thanks for offering time to have a KIP meeting to discuss
>>> > this.
>>> > I will ask other Kafka developer at LinkedIn about their availability.
>>>
>>> Yeah, it would be nice to talk about this.
>>
>>
>>> regards,
>>> Colin
>>>
>>>
>>> >
>>> > Thanks,
>>> > Dong
>>> >
>>> >
>>> > On Wed, Feb 1, 2017 at 10:37 AM, Eno Thereska <eno.there...@gmail.com>
>>> > wrote:
>>> >
>>> > > Hi Dong,
>>> > >
>>> > > Would it make sense to do a discussion over video/voice about this? I
>>> > > think it's sufficiently complex that we can probably make quicker
>>> progress
>>> > > that way? So shall we do a KIP meeting soon? I can do this week
>>> (Thu/Fri)
>>> > > or next week.
>>> > >
>>> > > Thanks
>>> > > Eno
>>> > > > On 1 Feb 2017, at 18:29, Colin McCabe <cmcc...@apache.org> wrote:
>>> > > >
>>> > > > Hmm.  Maybe I misinterpreted, but I got the impression that Grant
>>> was
>>> > > > suggesting that we avoid introducing this concept of "offline
>>> replicas"
>>> > > > for now.  Is that feasible?
>>> > > >
>>> > > > What is the strategy for declaring a log directory bad?  Is it an
>>> > > > administrative action?  Or is the broker itself going to be
>>> responsible
>>> > > > for this?  How do we handle cases where a few disks on a broker are
>>> > > > full, but the others have space?
>>> > > >
>>> > > > Are we going to have a disk scanner that will periodically check
>>> for
>>> > > > error conditions (similar to the background checks that RAID
>>> controllers
>>> > > > do)?  Or will we wait for a failure to happen before declaring a
>>> disk
>>> > > > bad?
>>> > > >
>>> > > > It seems to me that if we want this to work well we will need to
>>> fix
>>> > > > cases in the code where we are suppressing disk errors or ignoring
>>> their
>>> > > > root cause.  For example, any place where we are using the old
>>> Java APIs
>>> > > > that just return a boolean on failure will need to be fixed, since
>>> the
>>> > > > failure could now be disk full, permission denied, or IOE, and we
>>> will
>>> > > > need to handle those cases differently.  Also, we will need to
>>> harden
>>> > > > the code against disk errors.  Formerly it was OK to just crash on
>>> a
>>> > > > disk error; now it is not.  It would be nice to see more in the
>>> test
>>> > > > plan about injecting IOExceptions into disk handling code and
>>> verifying
>>> > > > that we can handle it correctly.
>>> > > >
>>> > > > regards,
>>> > > > Colin
>>> > > >
>>> > > >
>>> > > > On Wed, Feb 1, 2017, at 10:02, Dong Lin wrote:
>>> > > >> Hey Grant,
>>> > > >>
>>> > > >> Yes, this KIP does exactly what you described:)
>>> > > >>
>>> > > >> Thanks,
>>> > > >> Dong
>>> > > >>
>>> > > >> On Wed, Feb 1, 2017 at 9:45 AM, Grant Henke <ghe...@cloudera.com>
>>> > > wrote:
>>> > > >>
>>> > > >>> Hi Dong,
>>> > > >>>
>>> > > >>> Thanks for putting this together.
>>> > > >>>
>>> > > >>> Since we are discussing alternative/simplified options. Have you
>>> > > considered
>>> > > >>> handling the disk failures broker side to prevent a crash,
>>> marking the
>>> > > disk
>>> > > >>> as "bad" to that individual broker, and continuing as normal? I
>>> > > imagine the
>>> > > >>> broker would then fall out of sync for the replicas hosted on
>>> the bad
>>> > > disk
>>> > > >>> and the ISR would shrink. This would allow people using min.isr
>>> to keep
>>> > > >>> their data safe and the cluster operators would see a shrink in
>>> many
>>> > > ISRs
>>> > > >>> and hopefully an obvious log message leading to a quick fix. I
>>> haven't
>>> > > >>> thought through this idea in depth though. So there could be some
>>> > > >>> shortfalls.
>>> > > >>>
>>> > > >>> Thanks,
>>> > > >>> Grant
>>> > > >>>
>>> > > >>>
>>> > > >>>
>>> > > >>> On Wed, Feb 1, 2017 at 11:21 AM, Dong Lin <lindon...@gmail.com>
>>> wrote:
>>> > > >>>
>>> > > >>>> Hey Eno,
>>> > > >>>>
>>> > > >>>> Thanks much for the review.
>>> > > >>>>
>>> > > >>>> I think your suggestion is to split disks of a machine into
>>> multiple
>>> > > disk
>>> > > >>>> sets and run one broker per disk set. Yeah this is similar to
>>> Colin's
>>> > > >>>> suggestion of one-broker-per-disk, which we have evaluated at
>>> LinkedIn
>>> > > >>> and
>>> > > >>>> considered it to be a good short term approach.
>>> > > >>>>
>>> > > >>>> As of now I don't think any of these approach is a better
>>> alternative
>>> > > in
>>> > > >>>> the long term. I will summarize these here. I have put these
>>> reasons
>>> > > in
>>> > > >>> the
>>> > > >>>> KIP's motivation section and rejected alternative section. I am
>>> happy
>>> > > to
>>> > > >>>> discuss more and I would certainly like to use an alternative
>>> solution
>>> > > >>> that
>>> > > >>>> is easier to do with better performance.
>>> > > >>>>
>>> > > >>>> - JBOD vs. RAID-10: if we switch from RAID-10 with
>>> > > replication-factoer=2
>>> > > >>> to
>>> > > >>>> JBOD with replicatio-factor=3, we get 25% reduction in disk
>>> usage and
>>> > > >>>> doubles the tolerance of broker failure before data
>>> unavailability
>>> > > from 1
>>> > > >>>> to 2. This is pretty huge gain for any company that uses Kafka
>>> at
>>> > > large
>>> > > >>>> scale.
>>> > > >>>>
>>> > > >>>> - JBOD vs. one-broker-per-disk: The benefit of
>>> one-broker-per-disk is
>>> > > >>> that
>>> > > >>>> no major code change is needed in Kafka. Among the disadvantage
>>> of
>>> > > >>>> one-broker-per-disk summarized in the KIP and previous email
>>> with
>>> > > Colin,
>>> > > >>>> the biggest one is the 15% throughput loss compared to JBOD and
>>> less
>>> > > >>>> flexibility to balance across disks. Further, it probably
>>> requires
>>> > > change
>>> > > >>>> to internal deployment tools at various companies to deal with
>>> > > >>>> one-broker-per-disk setup.
>>> > > >>>>
>>> > > >>>> - JBOD vs. RAID-0: This is the setup that used at Microsoft. The
>>> > > problem
>>> > > >>> is
>>> > > >>>> that a broker becomes unavailable if any disk fail. Suppose
>>> > > >>>> replication-factor=2 and there are 10 disks per machine. Then
>>> the
>>> > > >>>> probability of of any message becomes unavailable due to disk
>>> failure
>>> > > >>> with
>>> > > >>>> RAID-0 is 100X higher than that with JBOD.
>>> > > >>>>
>>> > > >>>> - JBOD vs. one-broker-per-few-disks: one-broker-per-few-disk is
>>> > > somewhere
>>> > > >>>> between one-broker-per-disk and RAID-0. So it carries an
>>> averaged
>>> > > >>>> disadvantages of these two approaches.
>>> > > >>>>
>>> > > >>>> To answer your question regarding, I think it is reasonable to
>>> mange
>>> > > disk
>>> > > >>>> in Kafka. By "managing disks" we mean the management of
>>> assignment of
>>> > > >>>> replicas across disks. Here are my reasons in more detail:
>>> > > >>>>
>>> > > >>>> - I don't think this KIP is a big step change. By allowing user
>>> to
>>> > > >>>> configure Kafka to run multiple log directories or disks as of
>>> now,
>>> > > it is
>>> > > >>>> implicit that Kafka manages disks. It is just not a complete
>>> feature.
>>> > > >>>> Microsoft and probably other companies are using this feature
>>> under
>>> > > the
>>> > > >>>> undesirable effect that a broker will fail any if any disk
>>> fail. It is
>>> > > >>> good
>>> > > >>>> to complete this feature.
>>> > > >>>>
>>> > > >>>> - I think it is reasonable to manage disk in Kafka. One of the
>>> most
>>> > > >>>> important work that Kafka is doing is to determine the replica
>>> > > assignment
>>> > > >>>> across brokers and make sure enough copies of a given replica is
>>> > > >>> available.
>>> > > >>>> I would argue that it is not much different than determining the
>>> > > replica
>>> > > >>>> assignment across disk conceptually.
>>> > > >>>>
>>> > > >>>> - I would agree that this KIP is improve performance of Kafka
>>> at the
>>> > > cost
>>> > > >>>> of more complexity inside Kafka, by switching from RAID-10 to
>>> JBOD. I
>>> > > >>> would
>>> > > >>>> argue that this is a right direction. If we can gain 20%+
>>> performance
>>> > > by
>>> > > >>>> managing NIC in Kafka as compared to existing approach and other
>>> > > >>>> alternatives, I would say we should just do it. Such a gain in
>>> > > >>> performance,
>>> > > >>>> or equivalently reduction in cost, can save millions of dollars
>>> per
>>> > > year
>>> > > >>>> for any company running Kafka at large scale.
>>> > > >>>>
>>> > > >>>> Thanks,
>>> > > >>>> Dong
>>> > > >>>>
>>> > > >>>>
>>> > > >>>> On Wed, Feb 1, 2017 at 5:41 AM, Eno Thereska <
>>> eno.there...@gmail.com>
>>> > > >>>> wrote:
>>> > > >>>>
>>> > > >>>>> I'm coming somewhat late to the discussion, apologies for that.
>>> > > >>>>>
>>> > > >>>>> I'm worried about this proposal. It's moving Kafka to a world
>>> where
>>> > > it
>>> > > >>>>> manages disks. So in a sense, the scope of the KIP is limited,
>>> but
>>> > > the
>>> > > >>>>> direction it sets for Kafka is quite a big step change.
>>> Fundamentally
>>> > > >>>> this
>>> > > >>>>> is about balancing resources for a Kafka broker. This can be
>>> done by
>>> > > a
>>> > > >>>>> tool, rather than by changing Kafka. E.g., the tool would take
>>> a
>>> > > bunch
>>> > > >>> of
>>> > > >>>>> disks together, create a volume over them and export that to a
>>> Kafka
>>> > > >>>> broker
>>> > > >>>>> (in addition to setting the memory limits for that broker or
>>> limiting
>>> > > >>>> other
>>> > > >>>>> resources). A different bunch of disks can then make up a
>>> second
>>> > > >>> volume,
>>> > > >>>>> and be used by another Kafka broker. This is aligned with what
>>> Colin
>>> > > is
>>> > > >>>>> saying (as I understand it).
>>> > > >>>>>
>>> > > >>>>> Disks are not the only resource on a machine, there are several
>>> > > >>> instances
>>> > > >>>>> where multiple NICs are used for example. Do we want fine
>>> grained
>>> > > >>>>> management of all these resources? I'd argue that opens us the
>>> system
>>> > > >>> to
>>> > > >>>> a
>>> > > >>>>> lot of complexity.
>>> > > >>>>>
>>> > > >>>>> Thanks
>>> > > >>>>> Eno
>>> > > >>>>>
>>> > > >>>>>
>>> > > >>>>>> On 1 Feb 2017, at 01:53, Dong Lin <lindon...@gmail.com>
>>> wrote:
>>> > > >>>>>>
>>> > > >>>>>> Hi all,
>>> > > >>>>>>
>>> > > >>>>>> I am going to initiate the vote If there is no further
>>> concern with
>>> > > >>> the
>>> > > >>>>> KIP.
>>> > > >>>>>>
>>> > > >>>>>> Thanks,
>>> > > >>>>>> Dong
>>> > > >>>>>>
>>> > > >>>>>>
>>> > > >>>>>> On Fri, Jan 27, 2017 at 8:08 PM, radai <
>>> radai.rosenbl...@gmail.com>
>>> > > >>>>> wrote:
>>> > > >>>>>>
>>> > > >>>>>>> a few extra points:
>>> > > >>>>>>>
>>> > > >>>>>>> 1. broker per disk might also incur more client <--> broker
>>> > > sockets:
>>> > > >>>>>>> suppose every producer / consumer "talks" to >1 partition,
>>> there's
>>> > > a
>>> > > >>>>> very
>>> > > >>>>>>> good chance that partitions that were co-located on a single
>>> > > 10-disk
>>> > > >>>>> broker
>>> > > >>>>>>> would now be split between several single-disk broker
>>> processes on
>>> > > >>> the
>>> > > >>>>> same
>>> > > >>>>>>> machine. hard to put a multiplier on this, but likely >x1.
>>> sockets
>>> > > >>>> are a
>>> > > >>>>>>> limited resource at the OS level and incur some memory cost
>>> (kernel
>>> > > >>>>>>> buffers)
>>> > > >>>>>>>
>>> > > >>>>>>> 2. there's a memory overhead to spinning up a JVM (compiled
>>> code
>>> > > and
>>> > > >>>>> byte
>>> > > >>>>>>> code objects etc). if we assume this overhead is ~300 MB
>>> (order of
>>> > > >>>>>>> magnitude, specifics vary) than spinning up 10 JVMs would
>>> lose you
>>> > > 3
>>> > > >>>> GB
>>> > > >>>>> of
>>> > > >>>>>>> RAM. not a ton, but non negligible.
>>> > > >>>>>>>
>>> > > >>>>>>> 3. there would also be some overhead downstream of kafka in
>>> any
>>> > > >>>>> management
>>> > > >>>>>>> / monitoring / log aggregation system. likely less than x10
>>> though.
>>> > > >>>>>>>
>>> > > >>>>>>> 4. (related to above) - added complexity of administration
>>> with
>>> > > more
>>> > > >>>>>>> running instances.
>>> > > >>>>>>>
>>> > > >>>>>>> is anyone running kafka with anywhere near 100GB heaps? i
>>> thought
>>> > > >>> the
>>> > > >>>>> point
>>> > > >>>>>>> was to rely on kernel page cache to do the disk buffering
>>> ....
>>> > > >>>>>>>
>>> > > >>>>>>> On Thu, Jan 26, 2017 at 11:00 AM, Dong Lin <
>>> lindon...@gmail.com>
>>> > > >>>> wrote:
>>> > > >>>>>>>
>>> > > >>>>>>>> Hey Colin,
>>> > > >>>>>>>>
>>> > > >>>>>>>> Thanks much for the comment. Please see me comment inline.
>>> > > >>>>>>>>
>>> > > >>>>>>>> On Thu, Jan 26, 2017 at 10:15 AM, Colin McCabe <
>>> > > cmcc...@apache.org
>>> > > >>>>
>>> > > >>>>>>> wrote:
>>> > > >>>>>>>>
>>> > > >>>>>>>>> On Wed, Jan 25, 2017, at 13:50, Dong Lin wrote:
>>> > > >>>>>>>>>> Hey Colin,
>>> > > >>>>>>>>>>
>>> > > >>>>>>>>>> Good point! Yeah we have actually considered and tested
>>> this
>>> > > >>>>>>> solution,
>>> > > >>>>>>>>>> which we call one-broker-per-disk. It would work and
>>> should
>>> > > >>> require
>>> > > >>>>>>> no
>>> > > >>>>>>>>>> major change in Kafka as compared to this JBOD KIP. So it
>>> would
>>> > > >>> be
>>> > > >>>> a
>>> > > >>>>>>>> good
>>> > > >>>>>>>>>> short term solution.
>>> > > >>>>>>>>>>
>>> > > >>>>>>>>>> But it has a few drawbacks which makes it less desirable
>>> in the
>>> > > >>>> long
>>> > > >>>>>>>>>> term.
>>> > > >>>>>>>>>> Assume we have 10 disks on a machine. Here are the
>>> problems:
>>> > > >>>>>>>>>
>>> > > >>>>>>>>> Hi Dong,
>>> > > >>>>>>>>>
>>> > > >>>>>>>>> Thanks for the thoughtful reply.
>>> > > >>>>>>>>>
>>> > > >>>>>>>>>>
>>> > > >>>>>>>>>> 1) Our stress test result shows that one-broker-per-disk
>>> has 15%
>>> > > >>>>>>> lower
>>> > > >>>>>>>>>> throughput
>>> > > >>>>>>>>>>
>>> > > >>>>>>>>>> 2) Controller would need to send 10X as many
>>> > > LeaderAndIsrRequest,
>>> > > >>>>>>>>>> MetadataUpdateRequest and StopReplicaRequest. This
>>> increases the
>>> > > >>>>>>> burden
>>> > > >>>>>>>>>> on
>>> > > >>>>>>>>>> controller which can be the performance bottleneck.
>>> > > >>>>>>>>>
>>> > > >>>>>>>>> Maybe I'm misunderstanding something, but there would not
>>> be 10x
>>> > > >>> as
>>> > > >>>>>>> many
>>> > > >>>>>>>>> StopReplicaRequest RPCs, would there?  The other requests
>>> would
>>> > > >>>>>>> increase
>>> > > >>>>>>>>> 10x, but from a pretty low base, right?  We are not
>>> reassigning
>>> > > >>>>>>>>> partitions all the time, I hope (or else we have bigger
>>> > > >>> problems...)
>>> > > >>>>>>>>>
>>> > > >>>>>>>>
>>> > > >>>>>>>> I think the controller will group StopReplicaRequest per
>>> broker
>>> > > and
>>> > > >>>>> send
>>> > > >>>>>>>> only one StopReplicaRequest to a broker during controlled
>>> > > shutdown.
>>> > > >>>>>>> Anyway,
>>> > > >>>>>>>> we don't have to worry about this if we agree that other
>>> requests
>>> > > >>>> will
>>> > > >>>>>>>> increase by 10X. One MetadataRequest to send to each broker
>>> in the
>>> > > >>>>>>> cluster
>>> > > >>>>>>>> every time there is leadership change. I am not sure this
>>> is a
>>> > > real
>>> > > >>>>>>>> problem. But in theory this makes the overhead complexity
>>> O(number
>>> > > >>> of
>>> > > >>>>>>>> broker) and may be a concern in the future. Ideally we
>>> should
>>> > > avoid
>>> > > >>>> it.
>>> > > >>>>>>>>
>>> > > >>>>>>>>
>>> > > >>>>>>>>>
>>> > > >>>>>>>>>>
>>> > > >>>>>>>>>> 3) Less efficient use of physical resource on the
>>> machine. The
>>> > > >>>> number
>>> > > >>>>>>>> of
>>> > > >>>>>>>>>> socket on each machine will increase by 10X. The number of
>>> > > >>>> connection
>>> > > >>>>>>>>>> between any two machine will increase by 100X.
>>> > > >>>>>>>>>>
>>> > > >>>>>>>>>> 4) Less efficient way to management memory and quota.
>>> > > >>>>>>>>>>
>>> > > >>>>>>>>>> 5) Rebalance between disks/brokers on the same machine
>>> will less
>>> > > >>>>>>>>>> efficient
>>> > > >>>>>>>>>> and less flexible. Broker has to read data from another
>>> broker
>>> > > on
>>> > > >>>> the
>>> > > >>>>>>>>>> same
>>> > > >>>>>>>>>> machine via socket. It is also harder to do automatic load
>>> > > >>> balance
>>> > > >>>>>>>>>> between
>>> > > >>>>>>>>>> disks on the same machine in the future.
>>> > > >>>>>>>>>>
>>> > > >>>>>>>>>> I will put this and the explanation in the rejected
>>> alternative
>>> > > >>>>>>>> section.
>>> > > >>>>>>>>>> I
>>> > > >>>>>>>>>> have a few questions:
>>> > > >>>>>>>>>>
>>> > > >>>>>>>>>> - Can you explain why this solution can help avoid
>>> scalability
>>> > > >>>>>>>>>> bottleneck?
>>> > > >>>>>>>>>> I actually think it will exacerbate the scalability
>>> problem due
>>> > > >>> the
>>> > > >>>>>>> 2)
>>> > > >>>>>>>>>> above.
>>> > > >>>>>>>>>> - Why can we push more RPC with this solution?
>>> > > >>>>>>>>>
>>> > > >>>>>>>>> To really answer this question we'd have to take a deep
>>> dive into
>>> > > >>>> the
>>> > > >>>>>>>>> locking of the broker and figure out how effectively it can
>>> > > >>>>> parallelize
>>> > > >>>>>>>>> truly independent requests.  Almost every multithreaded
>>> process
>>> > > is
>>> > > >>>>>>> going
>>> > > >>>>>>>>> to have shared state, like shared queues or shared
>>> sockets, that
>>> > > >>> is
>>> > > >>>>>>>>> going to make scaling less than linear when you add disks
>>> or
>>> > > >>>>>>> processors.
>>> > > >>>>>>>>> (And clearly, another option is to improve that
>>> scalability,
>>> > > >>> rather
>>> > > >>>>>>>>> than going multi-process!)
>>> > > >>>>>>>>>
>>> > > >>>>>>>>
>>> > > >>>>>>>> Yeah I also think it is better to improve scalability
>>> inside kafka
>>> > > >>>> code
>>> > > >>>>>>> if
>>> > > >>>>>>>> possible. I am not sure we currently have any scalability
>>> issue
>>> > > >>>> inside
>>> > > >>>>>>>> Kafka that can not be removed without using multi-process.
>>> > > >>>>>>>>
>>> > > >>>>>>>>
>>> > > >>>>>>>>>
>>> > > >>>>>>>>>> - It is true that a garbage collection in one broker
>>> would not
>>> > > >>>> affect
>>> > > >>>>>>>>>> others. But that is after every broker only uses 1/10 of
>>> the
>>> > > >>>> memory.
>>> > > >>>>>>>> Can
>>> > > >>>>>>>>>> we be sure that this will actually help performance?
>>> > > >>>>>>>>>
>>> > > >>>>>>>>> The big question is, how much memory do Kafka brokers use
>>> now,
>>> > > and
>>> > > >>>> how
>>> > > >>>>>>>>> much will they use in the future?  Our experience in HDFS
>>> was
>>> > > that
>>> > > >>>>> once
>>> > > >>>>>>>>> you start getting more than 100-200GB Java heap sizes,
>>> full GCs
>>> > > >>>> start
>>> > > >>>>>>>>> taking minutes to finish when using the standard JVMs.
>>> That
>>> > > alone
>>> > > >>>> is
>>> > > >>>>> a
>>> > > >>>>>>>>> good reason to go multi-process or consider storing more
>>> things
>>> > > >>> off
>>> > > >>>>> the
>>> > > >>>>>>>>> Java heap.
>>> > > >>>>>>>>>
>>> > > >>>>>>>>
>>> > > >>>>>>>> I see. Now I agree one-broker-per-disk should be more
>>> efficient in
>>> > > >>>>> terms
>>> > > >>>>>>> of
>>> > > >>>>>>>> GC since each broker probably needs less than 1/10 of the
>>> memory
>>> > > >>>>>>> available
>>> > > >>>>>>>> on a typical machine nowadays. I will remove this from the
>>> reason
>>> > > >>> of
>>> > > >>>>>>>> rejection.
>>> > > >>>>>>>>
>>> > > >>>>>>>>
>>> > > >>>>>>>>>
>>> > > >>>>>>>>> Disk failure is the "easy" case.  The "hard" case, which is
>>> > > >>>>>>>>> unfortunately also the much more common case, is disk
>>> > > misbehavior.
>>> > > >>>>>>>>> Towards the end of their lives, disks tend to start
>>> slowing down
>>> > > >>>>>>>>> unpredictably.  Requests that would have completed
>>> immediately
>>> > > >>>> before
>>> > > >>>>>>>>> start taking 20, 100 500 milliseconds.  Some files may be
>>> > > readable
>>> > > >>>> and
>>> > > >>>>>>>>> other files may not be.  System calls hang, sometimes
>>> forever,
>>> > > and
>>> > > >>>> the
>>> > > >>>>>>>>> Java process can't abort them, because the hang is in the
>>> kernel.
>>> > > >>>> It
>>> > > >>>>>>> is
>>> > > >>>>>>>>> not fun when threads are stuck in "D state"
>>> > > >>>>>>>>> http://stackoverflow.com/quest
>>> ions/20423521/process-perminan
>>> > > >>>>>>>>> tly-stuck-on-d-state
>>> > > >>>>>>>>> .  Even kill -9 cannot abort the thread then.
>>> Fortunately, this
>>> > > >>> is
>>> > > >>>>>>>>> rare.
>>> > > >>>>>>>>>
>>> > > >>>>>>>>
>>> > > >>>>>>>> I agree it is a harder problem and it is rare. We probably
>>> don't
>>> > > >>> have
>>> > > >>>>> to
>>> > > >>>>>>>> worry about it in this KIP since this issue is orthogonal to
>>> > > >>> whether
>>> > > >>>> or
>>> > > >>>>>>> not
>>> > > >>>>>>>> we use JBOD.
>>> > > >>>>>>>>
>>> > > >>>>>>>>
>>> > > >>>>>>>>>
>>> > > >>>>>>>>> Another approach we should consider is for Kafka to
>>> implement its
>>> > > >>>> own
>>> > > >>>>>>>>> storage layer that would stripe across multiple disks.
>>> This
>>> > > >>>> wouldn't
>>> > > >>>>>>>>> have to be done at the block level, but could be done at
>>> the file
>>> > > >>>>>>> level.
>>> > > >>>>>>>>> We could use consistent hashing to determine which disks a
>>> file
>>> > > >>>> should
>>> > > >>>>>>>>> end up on, for example.
>>> > > >>>>>>>>>
>>> > > >>>>>>>>
>>> > > >>>>>>>> Are you suggesting that we should distribute log, or log
>>> segment,
>>> > > >>>>> across
>>> > > >>>>>>>> disks of brokers? I am not sure if I fully understand this
>>> > > >>> approach.
>>> > > >>>> My
>>> > > >>>>>>> gut
>>> > > >>>>>>>> feel is that this would be a drastic solution that would
>>> require
>>> > > >>>>>>>> non-trivial design. While this may be useful to Kafka, I
>>> would
>>> > > >>> prefer
>>> > > >>>>> not
>>> > > >>>>>>>> to discuss this in detail in this thread unless you believe
>>> it is
>>> > > >>>>>>> strictly
>>> > > >>>>>>>> superior to the design in this KIP in terms of solving our
>>> > > >>> use-case.
>>> > > >>>>>>>>
>>> > > >>>>>>>>
>>> > > >>>>>>>>> best,
>>> > > >>>>>>>>> Colin
>>> > > >>>>>>>>>
>>> > > >>>>>>>>>>
>>> > > >>>>>>>>>> Thanks,
>>> > > >>>>>>>>>> Dong
>>> > > >>>>>>>>>>
>>> > > >>>>>>>>>> On Wed, Jan 25, 2017 at 11:34 AM, Colin McCabe <
>>> > > >>> cmcc...@apache.org
>>> > > >>>>>
>>> > > >>>>>>>>>> wrote:
>>> > > >>>>>>>>>>
>>> > > >>>>>>>>>>> Hi Dong,
>>> > > >>>>>>>>>>>
>>> > > >>>>>>>>>>> Thanks for the writeup!  It's very interesting.
>>> > > >>>>>>>>>>>
>>> > > >>>>>>>>>>> I apologize in advance if this has been discussed
>>> somewhere
>>> > > >>> else.
>>> > > >>>>>>>> But
>>> > > >>>>>>>>> I
>>> > > >>>>>>>>>>> am curious if you have considered the solution of running
>>> > > >>> multiple
>>> > > >>>>>>>>>>> brokers per node.  Clearly there is a memory overhead
>>> with this
>>> > > >>>>>>>>> solution
>>> > > >>>>>>>>>>> because of the fixed cost of starting multiple JVMs.
>>> However,
>>> > > >>>>>>>> running
>>> > > >>>>>>>>>>> multiple JVMs would help avoid scalability bottlenecks.
>>> You
>>> > > >>> could
>>> > > >>>>>>>>>>> probably push more RPCs per second, for example.  A
>>> garbage
>>> > > >>>>>>>> collection
>>> > > >>>>>>>>>>> in one broker would not affect the others.  It would be
>>> > > >>>> interesting
>>> > > >>>>>>>> to
>>> > > >>>>>>>>>>> see this considered in the "alternate designs" design,
>>> even if
>>> > > >>> you
>>> > > >>>>>>>> end
>>> > > >>>>>>>>>>> up deciding it's not the way to go.
>>> > > >>>>>>>>>>>
>>> > > >>>>>>>>>>> best,
>>> > > >>>>>>>>>>> Colin
>>> > > >>>>>>>>>>>
>>> > > >>>>>>>>>>>
>>> > > >>>>>>>>>>> On Thu, Jan 12, 2017, at 10:46, Dong Lin wrote:
>>> > > >>>>>>>>>>>> Hi all,
>>> > > >>>>>>>>>>>>
>>> > > >>>>>>>>>>>> We created KIP-112: Handle disk failure for JBOD.
>>> Please find
>>> > > >>> the
>>> > > >>>>>>>> KIP
>>> > > >>>>>>>>>>>> wiki
>>> > > >>>>>>>>>>>> in the link https://cwiki.apache.org/confl
>>> > > >>>>>>> uence/display/KAFKA/KIP-
>>> > > >>>>>>>>>>>> 112%3A+Handle+disk+failure+for+JBOD.
>>> > > >>>>>>>>>>>>
>>> > > >>>>>>>>>>>> This KIP is related to KIP-113
>>> > > >>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>> > > >>>>>>>>>>> 113%3A+Support+replicas+moveme
>>> nt+between+log+directories>:
>>> > > >>>>>>>>>>>> Support replicas movement between log directories. They
>>> are
>>> > > >>>>>>> needed
>>> > > >>>>>>>> in
>>> > > >>>>>>>>>>>> order
>>> > > >>>>>>>>>>>> to support JBOD in Kafka. Please help review the KIP.
>>> You
>>> > > >>>>>>> feedback
>>> > > >>>>>>>> is
>>> > > >>>>>>>>>>>> appreciated!
>>> > > >>>>>>>>>>>>
>>> > > >>>>>>>>>>>> Thanks,
>>> > > >>>>>>>>>>>> Dong
>>> > > >>>>>>>>>>>
>>> > > >>>>>>>>>
>>> > > >>>>>>>>
>>> > > >>>>>>>
>>> > > >>>>>
>>> > > >>>>>
>>> > > >>>>
>>> > > >>>
>>> > > >>>
>>> > > >>>
>>> > > >>> --
>>> > > >>> Grant Henke
>>> > > >>> Software Engineer | Cloudera
>>> > > >>> gr...@cloudera.com | twitter.com/gchenke |
>>> linkedin.com/in/granthenke
>>> > > >>>
>>> > >
>>> > >
>>>
>>
>>
>

Reply via email to