Hi, Dong,

Thanks for the discussion in the KIP meeting today. A few comments inlined
below.

On Mon, Feb 6, 2017 at 7:22 PM, Dong Lin <lindon...@gmail.com> wrote:

> Hey Jun,
>
> Thanks for the review! Please see reply inline.
>
> On Mon, Feb 6, 2017 at 6:21 PM, Jun Rao <j...@confluent.io> wrote:
>
> > Hi, Dong,
> >
> > Thanks for the proposal. A few quick questions/comments.
> >
> > 1. Do you know why your stress test loses 15% of the throughput with the
> > one-broker-per-disk setup?
> >
>
> I think it is probably related to thread scheduling and socket management,
> though I haven't validated this theory.
>
> With one-broker-per-disk setup, each broker has 16 io threads, 12 network
> threads, 14 replica fetcher threads.
> With one-broker-per-machine setup, each broker has 160 io threads, 120
> network threads, 140 replica fetcher threads.
>
> I can test this theory by increasing the thread of broker by 10 in an
> existing cluster and see if throughput capacity changes. It is not
> surprising if performance does degrade with 10X threads. But I haven't
> validated this yet.
>


> > 2. In the KIP, it wasn't super clear to me what
> > /broker/topics/[topic]/partitions/[partitionId]/controller_managed_state
> > represents
> > and when it's updated. It seems that this path should be updated every
> time
> > the disk associated with one of the replica goes bad or is fixed.
> However,
> > the wiki only mentions updating this path when a new topic is created. It
> > would be useful to provide a high level description of what this path
> > stores, when it's updated and by who.
> >
>
> This path will only be updated by controller. When a replica is
> successfully created on a broker, controller at this replica id to the
> "created" list of the corresponding partition. When a replica needs to be
> re-created because the bad disk is replaced with an empty good disk, user
> executes kafka-log-dirs.sh so that controller will remove this replica id
> from the "created" list of the corresponding partition.
>
> The first part is described in "Topic gets created" scenario. The second
> part is kind of mentioned in the "The disk (or log directory) gets fixed"
> scenario but not clear as it doesn't mention the full zookeeper path. I
> have made this
> <https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?
> pageId=67638402&selectedPageVersions=7&selectedPageVersions=8>
> change in the KIP to clarify the second part.
>
> Currently I have used steps per scenario to describe how the KIP works.
> Would you like me to have a section to describe how this ZK path is
> managed?
>
>
>
So, it seems that
/broker/topics/[topic]/partitions/[partitionId]/controller_managed_state
is a reflection of the log directory state in the broker. It would be
useful to describe how the broker maintains the directory state and whether
that state is reset during broker restart.

For completeness, it would be useful to also describe what happens during
(a) controller failover, (b) partition reassignment, (c) topic deletion
(for example, what happens when a replica to be deleted is on a failed log
directory).


> > 3. The proposal uses ZK to propagate disk failure/recovery from the
> broker
> > to the controller. Not sure if this is the best approach in the long
> term.
> > It may be better for the broker to send RPC requests directly to the
> > controller?
> >
>
> I choose to propagate this information via ZK for simplicity of the design
> and implementation since isr notification is passed via ZK and most events
> (e.g. broker offline, partition reassignment) are triggered in controller
> via ZK listener. Yes it can be implemented using RPC. But I am not very
> sure what we gain by using RPC instead of ZK. Should we have a separate KIP
> in the future to migrate all existing notification to using RPC?
>
>
My concern with ZK-based communication is efficiency. To send a message
from the broker to the controller in this approach, the sender needs to do
1 write to ZK and the receiver needs to do 1 read from ZK, followed by 1
delete to ZK. So, we will need a total of 5 RPCs (a read from ZK requires 1
RPC and a write/delete to ZK requires at least 2 RPCs). If the broker can
send a message directly to the controller, it just needs 1 RPC. Another
potential issue with the ZK-based approach is that it's hard for the sender
to receive a response. We made an exception for using ZK-based notification
for ISR propagation since it's a quicker way to fix an existing problem.
Since we are adding a new feature, it would be useful to think through
what's the best way for the broker to communicate with the controller in
the long term.


Thanks,

Jun


> >
> > Jun
> >
> >
> > On Wed, Jan 25, 2017 at 1:50 PM, Dong Lin <lindon...@gmail.com> wrote:
> >
> > > Hey Colin,
> > >
> > > Good point! Yeah we have actually considered and tested this solution,
> > > which we call one-broker-per-disk. It would work and should require no
> > > major change in Kafka as compared to this JBOD KIP. So it would be a
> good
> > > short term solution.
> > >
> > > But it has a few drawbacks which makes it less desirable in the long
> > term.
> > > Assume we have 10 disks on a machine. Here are the problems:
> > >
> > > 1) Our stress test result shows that one-broker-per-disk has 15% lower
> > > throughput
> > >
> > > 2) Controller would need to send 10X as many LeaderAndIsrRequest,
> > > MetadataUpdateRequest and StopReplicaRequest. This increases the burden
> > on
> > > controller which can be the performance bottleneck.
> > >
> > > 3) Less efficient use of physical resource on the machine. The number
> of
> > > socket on each machine will increase by 10X. The number of connection
> > > between any two machine will increase by 100X.
> > >
> > > 4) Less efficient way to management memory and quota.
> > >
> > > 5) Rebalance between disks/brokers on the same machine will less
> > efficient
> > > and less flexible. Broker has to read data from another broker on the
> > same
> > > machine via socket. It is also harder to do automatic load balance
> > between
> > > disks on the same machine in the future.
> > >
> > > I will put this and the explanation in the rejected alternative
> section.
> > I
> > > have a few questions:
> > >
> > > - Can you explain why this solution can help avoid scalability
> > bottleneck?
> > > I actually think it will exacerbate the scalability problem due the 2)
> > > above.
> > > - Why can we push more RPC with this solution?
> > > - It is true that a garbage collection in one broker would not affect
> > > others. But that is after every broker only uses 1/10 of the memory.
> Can
> > we
> > > be sure that this will actually help performance?
> > >
> > > Thanks,
> > > Dong
> > >
> > > On Wed, Jan 25, 2017 at 11:34 AM, Colin McCabe <cmcc...@apache.org>
> > wrote:
> > >
> > > > Hi Dong,
> > > >
> > > > Thanks for the writeup!  It's very interesting.
> > > >
> > > > I apologize in advance if this has been discussed somewhere else.
> But
> > I
> > > > am curious if you have considered the solution of running multiple
> > > > brokers per node.  Clearly there is a memory overhead with this
> > solution
> > > > because of the fixed cost of starting multiple JVMs.  However,
> running
> > > > multiple JVMs would help avoid scalability bottlenecks.  You could
> > > > probably push more RPCs per second, for example.  A garbage
> collection
> > > > in one broker would not affect the others.  It would be interesting
> to
> > > > see this considered in the "alternate designs" design, even if you
> end
> > > > up deciding it's not the way to go.
> > > >
> > > > best,
> > > > Colin
> > > >
> > > >
> > > > On Thu, Jan 12, 2017, at 10:46, Dong Lin wrote:
> > > > > Hi all,
> > > > >
> > > > > We created KIP-112: Handle disk failure for JBOD. Please find the
> KIP
> > > > > wiki
> > > > > in the link https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > 112%3A+Handle+disk+failure+for+JBOD.
> > > > >
> > > > > This KIP is related to KIP-113
> > > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 113%3A+Support+replicas+movement+between+log+directories>:
> > > > > Support replicas movement between log directories. They are needed
> in
> > > > > order
> > > > > to support JBOD in Kafka. Please help review the KIP. You feedback
> is
> > > > > appreciated!
> > > > >
> > > > > Thanks,
> > > > > Dong
> > > >
> > >
> >
>

Reply via email to