Hey Jun,

Thanks much for your detailed comments. Please see my reply below.

On Mon, Mar 13, 2017 at 9:09 AM, Jun Rao <j...@confluent.io> wrote:

> Hi, Dong,
>
> Thanks for the updated KIP. Some more comments below.
>
> 10. For the .move log, do we perform any segment deletion (based on
> retention) or log cleaning (if a compacted topic)? Or do we only enable
> that after the swap?
>
> 11. kafka-reassign-partitions.sh
> 11.1 If all reassigned replicas are in the current broker and only the log
> directories have changed, we can probably optimize the tool to not trigger
> partition reassignment through the controller and only
> send ChangeReplicaDirRequest.
>

Yes, the reassignment script should not create the reassignment znode if no
replicas are not be moved between brokers. This falls into the "How to move
replica between log directories on the same broker" of the Proposed Change
section.


> 11.2 If ChangeReplicaDirRequest specifies a replica that's not created yet,
> could the broker just remember that in memory and create the replica when
> the creation is requested? This way, when doing cluster expansion, we can
> make sure that the new replicas on the new brokers are created in the right
> log directory in the first place. We can also avoid the tool having to keep
> issuing ChangeReplicaDirRequest in response to
> ReplicaNotAvailableException.
>

I am concerned that the ChangeReplicaDirRequest would be lost if broker
restarts after it sends ChangeReplicaDirResponse but before it receives
LeaderAndIsrRequest. In this case, the user will receive success when they
initiate replica reassignment, but replica reassignment will never complete
when they verify the reassignment later. This would be confusing to user.

There are three different approaches to this problem if broker has not
created replica yet after it receives ChangeReplicaDirResquest:

1) Broker immediately replies to user with ReplicaNotAvailableException and
user can decide to retry again later. The advantage of this solution is
that the broker logic is very simple and the reassignment script logic also
seems straightforward. The disadvantage is that user script has to retry.
But it seems fine - we can set interval between retries to be 0.5 sec so
that broker want be bombarded by those requests. This is the solution
chosen in the current KIP.

2) Broker can put ChangeReplicaDirRequest in a purgatory with timeout and
replies to user after the replica has been created. I didn't choose this in
the interest of keeping broker logic simpler.

3) Broker can remember that by making a mark in the disk, e.g. create
topicPartition.tomove directory in the destination log directory. This mark
will be persisted across broker restart. This is the first idea I had but I
replaced it with solution 1) in the interest of keeping broker simple.

It seems that solution 1) is the simplest one that works. But I am OK to
switch to the other two solutions if we don't want the retry logic. What do
you think?


11.3 Do we need an option in the tool to specify intra.broker.
> throttled.rate?
>

I don't find it useful to add this option to kafka-reassign-partitions.sh.
The reason we have the option "--throttle" in the script to throttle
replication rate is that we usually want higher quota to fix an offline
replica to get out of URP. But we are OK to have a lower quota if we are
moving replica only to balance the cluster. Thus it is common for SRE to
use different quota when using kafka-reassign-partitions.sh to move replica
between brokers.

However, the only reason for moving replica between log directories of the
same broker is to balance cluster resource. Thus the option to
specify intra.broker.throttled.rate in the tool is not that useful. I am
inclined not to add this option to keep this tool's usage simpler.


>
> 12. DescribeDirsRequest
> 12.1 In other requests like CreateTopicRequest, we return an empty list in
> the response for an empty input list. If the input list is null, we return
> everything. We should probably follow the same convention here.
>

Thanks. I wasn't aware of this convention. I have change
DescribeDirsRequest so that "null" indicates "all".


> 12.2 Do we need the topics field? Since the request is about log dirs, it
> makes sense to specify the log dirs. But it's weird to specify topics.
>

The topics field is not necessary. But it is useful to reduce the response
size in case user are only interested in the status of a few topics. For
example, user may have initiated the reassignment of a given replica from
one log directory to another log directory on the same broker, and the user
only wants to check the status of this given partition by looking
at DescribeDirsResponse. Thus this field is useful.

I am not sure if it is weird to call this request DescribeDirsRequest. The
response is a map from log directory to information to some partitions on
the log directory. Do you think we need to change the name of the request?


> 12.3 DescribeDirsResponsePartition: Should we include firstOffset and
> nextOffset in the response? That could be useful to track the progress of
> the movement.
>

Yeah good point. I agree it is useful to include logEndOffset in the
response. According to Log.scala doc the logEndOffset is equivalent to the
nextOffset. User can track progress by checking the difference between
logEndOffset of the given partition in the source and destination log
directories. I have added logEndOffset to the DescribeDirsResponsePartition
in the KIP.

But it seems that we don't need firstOffset in the response. Do you think
firstOffset is still needed?


>
> 13. ChangeReplicaDirResponse: Do we need error code at both levels?
>

My bad. It is not needed. I have removed request level error code. I also
added ChangeReplicaDirRequestTopic and ChangeReplicaDirResponseTopic to
reduce duplication of the "topic" string in the request and response.


>
> 14. num.replica.move.threads: Does it default to # log dirs?
>

No. It doesn't. I expect default number to be set to a conservative value
such as 3. It may be surprising to user if the number of threads increase
just because they have assigned more log directories to Kafka broker.

It seems that the number of replica move threads doesn't have to depend on
the number of log directories. It is possible to have one thread that moves
replicas across all log directories. On the other hand we can have multiple
threads to move replicas to the same log directory. For example, if broker
uses SSD, the CPU instead of disk IO may be the replica move bottleneck and
it will be faster to move replicas using multiple threads per log directory.


>
> Thanks,
>
> Jun
>
> On Thu, Mar 9, 2017 at 7:04 PM, Dong Lin <lindon...@gmail.com> wrote:
>
> > I just made one correction in the KIP. If broker receives
> > ChangeReplicaDirRequest and the replica hasn't been created there, the
> > broker will respond ReplicaNotAvailableException.
> > The kafka-reassignemnt-partitions.sh will need to re-send
> > ChangeReplicaDirRequest in this case in order to wait for controller to
> > send LeaderAndIsrRequest to broker. The previous approach of creating an
> > empty directory seems hacky.
> >
> >
> >
> >
> > On Thu, Mar 9, 2017 at 6:33 PM, Dong Lin <lindon...@gmail.com> wrote:
> >
> > > Hey Jun,
> > >
> > > Thanks for your comments! I have updated the KIP to address your
> > comments.
> > > Please see my reply inline.
> > >
> > > Can you let me know if the latest KIP has addressed your comments?
> > >
> > > On Wed, Mar 8, 2017 at 9:56 PM, Jun Rao <j...@confluent.io> wrote:
> > >
> > >> Hi, Dong,
> > >>
> > >> Thanks for the reply.
> > >>
> > >> 1.3 So the thread gets the lock, checks if caught up and releases the
> > lock
> > >> if not? Then, in the case when there is continuous incoming data, the
> > >> thread may never get a chance to swap. One way to address this is when
> > the
> > >> thread is getting really close in catching up, just hold onto the lock
> > >> until the thread fully catches up.
> > >>
> > >
> > > Yes, that was my original solution. I see your point that the lock may
> > not
> > > be fairly assigned to ReplicaMoveThread and RequestHandlerThread when
> > there
> > > is frequent incoming requets. You solution should address the problem
> > and I
> > > have updated the KIP to use it.
> > >
> > >
> > >>
> > >> 2.3 So, you are saying that the partition reassignment tool can first
> > send
> > >> a ChangeReplicaDirRequest to relevant brokers to establish the log dir
> > for
> > >> replicas not created yet, then trigger the partition movement across
> > >> brokers through the controller? That's actually a good idea. Then, we
> > can
> > >> just leave LeaderAndIsrRequest as it is.
> > >
> > >
> > > Yes, that is what I plan to do. If broker receives a
> > > ChangeReplicaDirRequest while it is not leader or follower of the
> > > partition, the broker will create an empty Log instance (i.e. a
> directory
> > > named topicPartition) in the destination log directory so that the
> > replica
> > > will be placed there when broker receives LeaderAndIsrRequest from the
> > > broker. The broker should clean up empty those Log instances on startup
> > > just in case a ChangeReplicaDirRequest was mistakenly sent to a broker
> > that
> > > was not meant to be follower/leader of the partition..
> > >
> > >
> > >> Another thing related to
> > >> ChangeReplicaDirRequest.
> > >> Since this request may take long to complete, I am not sure if we
> should
> > >> wait for the movement to complete before respond. While waiting for
> the
> > >> movement to complete, the idle connection may be killed or the client
> > may
> > >> be gone already. An alternative is to return immediately and add a new
> > >> request like CheckReplicaDirRequest to see if the movement has
> > completed.
> > >> The tool can take advantage of that to check the status.
> > >>
> > >
> > > I agree with your concern and solution. We need request to query the
> > > partition -> log_directory mapping on the broker. I have updated the
> KIP
> > to
> > > remove need for ChangeReplicaDirRequestPurgatory.
> > > Instead, kafka-reassignemnt-partitions.sh will send
> DescribeDirsRequest
> > > to brokers when user wants to verify the partition assignment. Since we
> > > need this DescribeDirsRequest anyway, we can also use this request to
> > > expose stats like the individual log size instead of using JMX. One
> > > drawback of using JMX is that user has to manage the JMX port and
> related
> > > credentials if they haven't already done this, which is the case at
> > > LinkedIn.
> > >
> > >
> > >> Thanks,
> > >>
> > >> Jun
> > >>
> > >>
> > >>
> > >> On Wed, Mar 8, 2017 at 6:21 PM, Dong Lin <lindon...@gmail.com> wrote:
> > >>
> > >> > Hey Jun,
> > >> >
> > >> > Thanks for the detailed explanation. I will use the separate thread
> > >> pool to
> > >> > move replica between log directories. I will let you know when the
> KIP
> > >> has
> > >> > been updated to use a separate thread pool.
> > >> >
> > >> > Here is my response to your other questions:
> > >> >
> > >> > 1.3 My idea is that the ReplicaMoveThread that moves data should get
> > the
> > >> > lock before checking whether the replica in the destination log
> > >> directory
> > >> > has caught up. If the new replica has caught up, then the
> > >> ReplicaMoveThread
> > >> > should swaps the replica while it is still holding the lock. The
> > >> > ReplicaFetcherThread or RequestHandlerThread will not be able to
> > append
> > >> > data to the replica in the source replica during this period because
> > >> they
> > >> > can not get the lock. Does this address the problem?
> > >> >
> > >> > 2.3 I get your point that we want to keep controller simpler. If
> admin
> > >> tool
> > >> > can send ChangeReplicaDirRequest to move data within a broker, then
> > >> > controller probably doesn't even need to include log directory path
> in
> > >> the
> > >> > LeaderAndIsrRequest. How about this: controller will only deal with
> > >> > reassignment across brokers as it does now. If user specified
> > >> destination
> > >> > replica for any disk, the admin tool will send
> ChangeReplicaDirRequest
> > >> and
> > >> > wait for response from broker to confirm that all replicas have been
> > >> moved
> > >> > to the destination log direcotry. The broker will put
> > >> > ChangeReplicaDirRequset in a purgatory and respond either when the
> > >> movement
> > >> > is completed or when the request has timed-out.
> > >> >
> > >> > 4. I agree that we can expose these metrics via JMX. But I am not
> sure
> > >> if
> > >> > it can be obtained easily with good performance using either
> existing
> > >> tools
> > >> > or new script in kafka. I will ask SREs for their opinion.
> > >> >
> > >> > Thanks,
> > >> > Dong
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >
> > >> > On Wed, Mar 8, 2017 at 1:24 PM, Jun Rao <j...@confluent.io> wrote:
> > >> >
> > >> > > Hi, Dong,
> > >> > >
> > >> > > Thanks for the updated KIP. A few more comments below.
> > >> > >
> > >> > > 1.1 and 1.2: I am still not sure there is enough benefit of
> reusing
> > >> > > ReplicaFetchThread
> > >> > > to move data across disks.
> > >> > > (a) A big part of ReplicaFetchThread is to deal with issuing and
> > >> tracking
> > >> > > fetch requests. So, it doesn't feel that we get much from reusing
> > >> > > ReplicaFetchThread
> > >> > > only to disable the fetching part.
> > >> > > (b) The leader replica has no ReplicaFetchThread to start with. It
> > >> feels
> > >> > > weird to start one just for intra broker data movement.
> > >> > > (c) The ReplicaFetchThread is per broker. Intuitively, the number
> of
> > >> > > threads doing intra broker data movement should be related to the
> > >> number
> > >> > of
> > >> > > disks in the broker, not the number of brokers in the cluster.
> > >> > > (d) If the destination disk fails, we want to stop the intra
> broker
> > >> data
> > >> > > movement, but want to continue inter broker replication. So,
> > >> logically,
> > >> > it
> > >> > > seems it's better to separate out the two.
> > >> > > (e) I am also not sure if we should reuse the existing throttling
> > for
> > >> > > replication. It's designed to handle traffic across brokers and
> the
> > >> > > delaying is done in the fetch request. So, if we are not doing
> > >> > > fetching in ReplicaFetchThread,
> > >> > > I am not sure the existing throttling is effective. Also, when
> > >> specifying
> > >> > > the throttling of moving data across disks, it seems the user
> > >> shouldn't
> > >> > > care about whether a replica is a leader or a follower. Reusing
> the
> > >> > > existing throttling config name will be awkward in this regard.
> > >> > > (f) It seems it's simpler and more consistent to use a separate
> > thread
> > >> > pool
> > >> > > for local data movement (for both leader and follower replicas).
> > This
> > >> > > process can then be configured (e.g. number of threads, etc) and
> > >> > throttled
> > >> > > independently.
> > >> > >
> > >> > > 1.3 Yes, we will need some synchronization there. So, if the
> > movement
> > >> > > thread catches up, gets the lock to do the swap, but realizes that
> > new
> > >> > data
> > >> > > is added, it has to continue catching up while holding the lock?
> > >> > >
> > >> > > 2.3 The benefit of including the desired log directory in
> > >> > > LeaderAndIsrRequest
> > >> > > during partition reassignment is that the controller doesn't need
> to
> > >> > track
> > >> > > the progress for disk movement. So, you don't need the additional
> > >> > > BrokerDirStateUpdateRequest. Then the controller never needs to
> > issue
> > >> > > ChangeReplicaDirRequest.
> > >> > > Only the admin tool will issue ChangeReplicaDirRequest to move
> data
> > >> > within
> > >> > > a broker. I agree that this makes LeaderAndIsrRequest more
> > >> complicated,
> > >> > but
> > >> > > that seems simpler than changing the controller to track
> additional
> > >> > states
> > >> > > during partition reassignment.
> > >> > >
> > >> > > 4. We want to make a decision on how to expose the stats. So far,
> we
> > >> are
> > >> > > exposing stats like the individual log size as JMX. So, one way is
> > to
> > >> > just
> > >> > > add new jmx to expose the log directory of individual replicas.
> > >> > >
> > >> > > Thanks,
> > >> > >
> > >> > > Jun
> > >> > >
> > >> > >
> > >> > > On Thu, Mar 2, 2017 at 11:18 PM, Dong Lin <lindon...@gmail.com>
> > >> wrote:
> > >> > >
> > >> > > > Hey Jun,
> > >> > > >
> > >> > > > Thanks for all the comments! Please see my answer below. I have
> > >> updated
> > >> > > the
> > >> > > > KIP to address most of the questions and make the KIP easier to
> > >> > > understand.
> > >> > > >
> > >> > > > Thanks,
> > >> > > > Dong
> > >> > > >
> > >> > > > On Thu, Mar 2, 2017 at 9:35 AM, Jun Rao <j...@confluent.io>
> wrote:
> > >> > > >
> > >> > > > > Hi, Dong,
> > >> > > > >
> > >> > > > > Thanks for the KIP. A few comments below.
> > >> > > > >
> > >> > > > > 1. For moving data across directories
> > >> > > > > 1.1 I am not sure why we want to use ReplicaFetcherThread to
> > move
> > >> > data
> > >> > > > > around in the leader. ReplicaFetchThread fetches data from
> > socket.
> > >> > For
> > >> > > > > moving data locally, it seems that we want to avoid the socket
> > >> > > overhead.
> > >> > > > >
> > >> > > >
> > >> > > > The purpose of using ReplicaFetchThread is to re-use existing
> > thread
> > >> > > > instead of creating more threads and make our thread model more
> > >> > complex.
> > >> > > It
> > >> > > > seems like a nature choice for copying data between disks since
> it
> > >> is
> > >> > > > similar to copying data between brokers. Another reason is that
> if
> > >> the
> > >> > > > replica to be moved is a follower, we don't need lock to swap
> > >> replicas
> > >> > > when
> > >> > > > destination replica has caught up, since the same thread which
> is
> > >> > > fetching
> > >> > > > data from leader will swap the replica.
> > >> > > >
> > >> > > > The ReplicaFetchThread will not incur socket overhead while
> > copying
> > >> > data
> > >> > > > between disks. It will read directly from source disk (as we do
> > when
> > >> > > > processing FetchRequest) and write to destination disk (as we do
> > >> when
> > >> > > > processing ProduceRequest).
> > >> > > >
> > >> > > >
> > >> > > > > 1.2 I am also not sure about moving data in the
> > >> ReplicaFetcherThread
> > >> > in
> > >> > > > the
> > >> > > > > follower. For example, I am not sure setting
> > >> replica.fetch.max.wait
> > >> > to
> > >> > > 0
> > >> > > > >  is ideal. It may not always be effective since a fetch
> request
> > in
> > >> > the
> > >> > > > > ReplicaFetcherThread could be arbitrarily delayed due to
> > >> replication
> > >> > > > > throttling on the leader. In general, the data movement logic
> > >> across
> > >> > > > disks
> > >> > > > > seems different from that in ReplicaFetcherThread. So, I am
> not
> > >> sure
> > >> > > why
> > >> > > > > they need to be coupled.
> > >> > > > >
> > >> > > >
> > >> > > > While it may not be the most efficient way to copy data between
> > >> local
> > >> > > > disks, it will be at least as efficient as copying data from
> > leader
> > >> to
> > >> > > the
> > >> > > > destination disk. The expected goal of KIP-113 is to enable data
> > >> > movement
> > >> > > > between disks with no less efficiency than what we do now when
> > >> moving
> > >> > > data
> > >> > > > between brokers. I think we can optimize its performance using
> > >> separate
> > >> > > > thread if the performance is not good enough.
> > >> > > >
> > >> > > >
> > >> > > > > 1.3 Could you add a bit more details on how we swap the
> replicas
> > >> when
> > >> > > the
> > >> > > > > new ones are fully caught up? For example, what happens when
> the
> > >> new
> > >> > > > > replica in the new log directory is caught up, but when we
> want
> > >> to do
> > >> > > the
> > >> > > > > swap, some new data has arrived?
> > >> > > > >
> > >> > > >
> > >> > > > If the replica is a leader, then ReplicaFetcherThread will
> perform
> > >> the
> > >> > > > replacement. Proper lock is needed to prevent
> KafkaRequestHandler
> > >> from
> > >> > > > appending data to the topicPartition.log on the source disks
> > before
> > >> > this
> > >> > > > replacement is completed by ReplicaFetcherThread.
> > >> > > >
> > >> > > > If the replica is a follower, because the same
> ReplicaFetchThread
> > >> which
> > >> > > > fetches data from leader will also swap the replica , no lock is
> > >> > needed.
> > >> > > >
> > >> > > > I have updated the KIP to specify both more explicitly.
> > >> > > >
> > >> > > >
> > >> > > >
> > >> > > > > 1.4 Do we need to do the .move at the log segment level or
> could
> > >> we
> > >> > > just
> > >> > > > do
> > >> > > > > that at the replica directory level? Renaming just a directory
> > is
> > >> > much
> > >> > > > > faster than renaming the log segments.
> > >> > > > >
> > >> > > >
> > >> > > > Great point. I have updated the KIP to rename the log directory
> > >> > instead.
> > >> > > >
> > >> > > >
> > >> > > > > 1.5 Could you also describe a bit what happens when either the
> > >> source
> > >> > > or
> > >> > > > > the target log directory fails while the data moving is in
> > >> progress?
> > >> > > > >
> > >> > > >
> > >> > > > If source log directory fails, then the replica movement will
> stop
> > >> and
> > >> > > the
> > >> > > > source replica is marked offline. If destination log directory
> > >> fails,
> > >> > > then
> > >> > > > the replica movement will stop. I have updated the KIP to
> clarify
> > >> this.
> > >> > > >
> > >> > > >
> > >> > > > >
> > >> > > > > 2. For partition reassignment.
> > >> > > > > 2.1 I am not sure if the controller can block on
> > >> > > ChangeReplicaDirRequest.
> > >> > > > > Data movement may take a long time to complete. If there is an
> > >> > > > outstanding
> > >> > > > > request from the controller to a broker, that broker won't be
> > >> able to
> > >> > > > > process any new request from the controller. So if another
> event
> > >> > (e.g.
> > >> > > > > broker failure) happens when the data movement is in progress,
> > >> > > subsequent
> > >> > > > > LeaderAnIsrRequest will be delayed.
> > >> > > > >
> > >> > > >
> > >> > > > Yeah good point. I missed the fact that there is be only one
> > >> inflight
> > >> > > > request from controller to broker.
> > >> > > >
> > >> > > > How about I add a request, e.g. BrokerDirStateUpdateRequest,
> which
> > >> maps
> > >> > > > topicPartition to log directory and can be sent from broker to
> > >> > controller
> > >> > > > to indicate completion?
> > >> > > >
> > >> > > >
> > >> > > >
> > >> > > > > 2.2 in the KIP, the partition reassignment tool is also used
> for
> > >> > cases
> > >> > > > > where an admin just wants to balance the existing data across
> > log
> > >> > > > > directories in the broker. In this case, it seems that it's
> over
> > >> > > killing
> > >> > > > to
> > >> > > > > have the process go through the controller. A simpler approach
> > is
> > >> to
> > >> > > > issue
> > >> > > > > an RPC request to the broker directly.
> > >> > > > >
> > >> > > >
> > >> > > > I agree we can optimize this case. It is just that we have to
> add
> > >> new
> > >> > > logic
> > >> > > > or code path to handle a scenario that is already covered by the
> > >> more
> > >> > > > complicated scenario. I will add it to the KIP.
> > >> > > >
> > >> > > >
> > >> > > > > 2.3 When using the partition reassignment tool to move
> replicas
> > >> > across
> > >> > > > > brokers, it make sense to be able to specify the log directory
> > of
> > >> the
> > >> > > > newly
> > >> > > > > created replicas. The KIP does that in two separate requests
> > >> > > > > ChangeReplicaDirRequest and LeaderAndIsrRequest, and tracks
> the
> > >> > > progress
> > >> > > > of
> > >> > > > > each independently. An alternative is to do that just in
> > >> > > > > LeaderAndIsrRequest.
> > >> > > > > That way, the new replicas will be created in the right log
> dir
> > in
> > >> > the
> > >> > > > > first place and the controller just needs to track the
> progress
> > of
> > >> > > > > partition reassignment in the current way.
> > >> > > > >
> > >> > > >
> > >> > > > I agree it is better to use one request instead of two to
> request
> > >> > replica
> > >> > > > movement between disks. But I think the performance advantage of
> > >> doing
> > >> > so
> > >> > > > is negligible because we trigger replica assignment much less
> than
> > >> all
> > >> > > > other kinds of events in the Kafka cluster. I am not sure that
> the
> > >> > > benefit
> > >> > > > of doing this is worth the effort to add an optional string
> field
> > in
> > >> > the
> > >> > > > LeaderAndIsrRequest. Also if we add this optional field in the
> > >> > > > LeaderAndIsrRequest, we probably want to remove
> > >> ChangeReplicaDirRequest
> > >> > > to
> > >> > > > avoid having two requests doing the same thing. But it means
> user
> > >> > script
> > >> > > > can not send request directly to the broker to trigger replica
> > >> movement
> > >> > > > between log directories.
> > >> > > >
> > >> > > > I will do it if you are strong about this optimzation.
> > >> > > >
> > >> > > >
> > >> > > > >
> > >> > > > > 3. /admin/reassign_partitions: Including the log dir in every
> > >> replica
> > >> > > may
> > >> > > > > not be efficient. We could include a list of log directories
> and
> > >> > > > reference
> > >> > > > > the index of the log directory in each replica.
> > >> > > > >
> > >> > > >
> > >> > > > Good point. I have updated the KIP to use this solution.
> > >> > > >
> > >> > > >
> > >> > > > >
> > >> > > > > 4. DescribeDirsRequest: The stats in the request are already
> > >> > available
> > >> > > > from
> > >> > > > > JMX. Do we need the new request?
> > >> > > > >
> > >> > > >
> > >> > > > Does JMX also include the state (i.e. offline or online) of each
> > log
> > >> > > > directory and the log directory of each replica? If not, then
> > maybe
> > >> we
> > >> > > > still need DescribeDirsRequest?
> > >> > > >
> > >> > > >
> > >> > > > >
> > >> > > > > 5. We want to be consistent on ChangeReplicaDirRequest vs
> > >> > > > > ChangeReplicaRequest.
> > >> > > > >
> > >> > > >
> > >> > > > I think ChangeReplicaRequest and ChangeReplicaResponse is my
> typo.
> > >> > Sorry,
> > >> > > > they are fixed now.
> > >> > > >
> > >> > > >
> > >> > > > >
> > >> > > > > Thanks,
> > >> > > > >
> > >> > > > > Jun
> > >> > > > >
> > >> > > > >
> > >> > > > > On Fri, Feb 3, 2017 at 6:19 PM, Dong Lin <lindon...@gmail.com
> >
> > >> > wrote:
> > >> > > > >
> > >> > > > > > Hey ALexey,
> > >> > > > > >
> > >> > > > > > Thanks for all the comments!
> > >> > > > > >
> > >> > > > > > I have updated the KIP to specify how we enforce quota. I
> also
> > >> > > updated
> > >> > > > > the
> > >> > > > > > "The thread model and broker logic for moving replica data
> > >> between
> > >> > > log
> > >> > > > > > directories" to make it easier to read. You can find the
> exact
> > >> > change
> > >> > > > > here
> > >> > > > > > <https://cwiki.apache.org/conf
> luence/pages/diffpagesbyversio
> > >> > > > > > n.action?pageId=67638408&selec
> tedPageVersions=5&selectedPage
> > >> > > > Versions=6>.
> > >> > > > > > The idea is to use the same replication quota mechanism
> > >> introduced
> > >> > in
> > >> > > > > > KIP-73.
> > >> > > > > >
> > >> > > > > > Thanks,
> > >> > > > > > Dong
> > >> > > > > >
> > >> > > > > >
> > >> > > > > >
> > >> > > > > > On Wed, Feb 1, 2017 at 2:16 AM, Alexey Ozeritsky <
> > >> > > aozerit...@yandex.ru
> > >> > > > >
> > >> > > > > > wrote:
> > >> > > > > >
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > > > 24.01.2017, 22:03, "Dong Lin" <lindon...@gmail.com>:
> > >> > > > > > > > Hey Alexey,
> > >> > > > > > > >
> > >> > > > > > > > Thanks. I think we agreed that the suggested solution
> > >> doesn't
> > >> > > work
> > >> > > > in
> > >> > > > > > > > general for kafka users. To answer your questions:
> > >> > > > > > > >
> > >> > > > > > > > 1. I agree we need quota to rate limit replica movement
> > >> when a
> > >> > > > broker
> > >> > > > > > is
> > >> > > > > > > > moving a "leader" replica. I will come up with solution,
> > >> > probably
> > >> > > > > > re-use
> > >> > > > > > > > the config of replication quota introduced in KIP-73.
> > >> > > > > > > >
> > >> > > > > > > > 2. Good point. I agree that this is a problem in
> general.
> > >> If is
> > >> > > no
> > >> > > > > new
> > >> > > > > > > data
> > >> > > > > > > > on that broker, with current default value of
> > >> > > > > > replica.fetch.wait.max.ms
> > >> > > > > > > > and replica.fetch.max.bytes, the replica will be moved
> at
> > >> only
> > >> > 2
> > >> > > > MBps
> > >> > > > > > > > throughput. I think the solution is for broker to set
> > >> > > > > > > > replica.fetch.wait.max.ms to 0 in its FetchRequest if
> the
> > >> > > > > > corresponding
> > >> > > > > > > > ReplicaFetcherThread needs to move some replica to
> another
> > >> > disk.
> > >> > > > > > > >
> > >> > > > > > > > 3. I have updated the KIP to mention that the read size
> > of a
> > >> > > given
> > >> > > > > > > > partition is configured using replica.fetch.max.bytes
> when
> > >> we
> > >> > > move
> > >> > > > > > > replicas
> > >> > > > > > > > between disks.
> > >> > > > > > > >
> > >> > > > > > > > Please see this
> > >> > > > > > > > <https://cwiki.apache.org/conf
> > >> luence/pages/diffpagesbyversio
> > >> > > > n.action
> > >> > > > > ?
> > >> > > > > > > pageId=67638408&selectedPageVe
> > rsions=4&selectedPageVersions=
> > >> 5>
> > >> > > > > > > > for the change of the KIP. I will come up with a
> solution
> > to
> > >> > > > throttle
> > >> > > > > > > > replica movement when a broker is moving a "leader"
> > replica.
> > >> > > > > > >
> > >> > > > > > > Thanks. It looks great.
> > >> > > > > > >
> > >> > > > > > > >
> > >> > > > > > > > On Tue, Jan 24, 2017 at 3:30 AM, Alexey Ozeritsky <
> > >> > > > > > aozerit...@yandex.ru>
> > >> > > > > > > > wrote:
> > >> > > > > > > >
> > >> > > > > > > >>  23.01.2017, 22:11, "Dong Lin" <lindon...@gmail.com>:
> > >> > > > > > > >>  > Thanks. Please see my comment inline.
> > >> > > > > > > >>  >
> > >> > > > > > > >>  > On Mon, Jan 23, 2017 at 6:45 AM, Alexey Ozeritsky <
> > >> > > > > > > aozerit...@yandex.ru>
> > >> > > > > > > >>  > wrote:
> > >> > > > > > > >>  >
> > >> > > > > > > >>  >> 13.01.2017, 22:29, "Dong Lin" <lindon...@gmail.com
> >:
> > >> > > > > > > >>  >> > Hey Alexey,
> > >> > > > > > > >>  >> >
> > >> > > > > > > >>  >> > Thanks for your review and the alternative
> > approach.
> > >> > Here
> > >> > > is
> > >> > > > > my
> > >> > > > > > > >>  >> > understanding of your patch. kafka's background
> > >> threads
> > >> > > are
> > >> > > > > used
> > >> > > > > > > to
> > >> > > > > > > >>  move
> > >> > > > > > > >>  >> > data between replicas. When data movement is
> > >> triggered,
> > >> > > the
> > >> > > > > log
> > >> > > > > > > will
> > >> > > > > > > >>  be
> > >> > > > > > > >>  >> > rolled and the new logs will be put in the new
> > >> > directory,
> > >> > > > and
> > >> > > > > > > >>  background
> > >> > > > > > > >>  >> > threads will move segment from old directory to
> new
> > >> > > > directory.
> > >> > > > > > > >>  >> >
> > >> > > > > > > >>  >> > It is important to note that KIP-112 is intended
> to
> > >> work
> > >> > > > with
> > >> > > > > > > >>  KIP-113 to
> > >> > > > > > > >>  >> > support JBOD. I think your solution is definitely
> > >> > simpler
> > >> > > > and
> > >> > > > > > > better
> > >> > > > > > > >>  >> under
> > >> > > > > > > >>  >> > the current kafka implementation that a broker
> will
> > >> fail
> > >> > > if
> > >> > > > > any
> > >> > > > > > > disk
> > >> > > > > > > >>  >> fails.
> > >> > > > > > > >>  >> > But I am not sure if we want to allow broker to
> run
> > >> with
> > >> > > > > partial
> > >> > > > > > > >>  disks
> > >> > > > > > > >>  >> > failure. Let's say the a replica is being moved
> > from
> > >> > > > > log_dir_old
> > >> > > > > > > to
> > >> > > > > > > >>  >> > log_dir_new and then log_dir_old stops working
> due
> > to
> > >> > disk
> > >> > > > > > > failure.
> > >> > > > > > > >>  How
> > >> > > > > > > >>  >> > would your existing patch handles it? To make the
> > >> > > scenario a
> > >> > > > > bit
> > >> > > > > > > more
> > >> > > > > > > >>  >>
> > >> > > > > > > >>  >> We will lose log_dir_old. After broker restart we
> can
> > >> read
> > >> > > the
> > >> > > > > > data
> > >> > > > > > > >>  from
> > >> > > > > > > >>  >> log_dir_new.
> > >> > > > > > > >>  >
> > >> > > > > > > >>  > No, you probably can't. This is because the broker
> > >> doesn't
> > >> > > have
> > >> > > > > > > *all* the
> > >> > > > > > > >>  > data for this partition. For example, say the broker
> > has
> > >> > > > > > > >>  > partition_segement_1, partition_segment_50 and
> > >> > > > > > partition_segment_100
> > >> > > > > > > on
> > >> > > > > > > >>  the
> > >> > > > > > > >>  > log_dir_old. partition_segment_100, which has the
> > latest
> > >> > > data,
> > >> > > > > has
> > >> > > > > > > been
> > >> > > > > > > >>  > moved to log_dir_new, and the log_dir_old fails
> before
> > >> > > > > > > >>  partition_segment_50
> > >> > > > > > > >>  > and partition_segment_1 is moved to log_dir_new.
> When
> > >> > broker
> > >> > > > > > > re-starts,
> > >> > > > > > > >>  it
> > >> > > > > > > >>  > won't have partition_segment_50. This causes problem
> > if
> > >> > > broker
> > >> > > > is
> > >> > > > > > > elected
> > >> > > > > > > >>  > leader and consumer wants to consume data in the
> > >> > > > > > partition_segment_1.
> > >> > > > > > > >>
> > >> > > > > > > >>  Right.
> > >> > > > > > > >>
> > >> > > > > > > >>  >
> > >> > > > > > > >>  >> > complicated, let's say the broker is shtudown,
> > >> > > log_dir_old's
> > >> > > > > > disk
> > >> > > > > > > >>  fails,
> > >> > > > > > > >>  >> > and the broker starts. In this case broker
> doesn't
> > >> even
> > >> > > know
> > >> > > > > if
> > >> > > > > > > >>  >> log_dir_new
> > >> > > > > > > >>  >> > has all the data needed for this replica. It
> > becomes
> > >> a
> > >> > > > problem
> > >> > > > > > if
> > >> > > > > > > the
> > >> > > > > > > >>  >> > broker is elected leader of this partition in
> this
> > >> case.
> > >> > > > > > > >>  >>
> > >> > > > > > > >>  >> log_dir_new contains the most recent data so we
> will
> > >> lose
> > >> > > the
> > >> > > > > tail
> > >> > > > > > > of
> > >> > > > > > > >>  >> partition.
> > >> > > > > > > >>  >> This is not a big problem for us because we already
> > >> delete
> > >> > > > tails
> > >> > > > > > by
> > >> > > > > > > >>  hand
> > >> > > > > > > >>  >> (see https://issues.apache.org/jira
> > /browse/KAFKA-1712
> > >> ).
> > >> > > > > > > >>  >> Also we dont use authomatic leader balancing
> > >> > > > > > > >>  (auto.leader.rebalance.enable=false),
> > >> > > > > > > >>  >> so this partition becomes the leader with a low
> > >> > probability.
> > >> > > > > > > >>  >> I think my patch can be modified to prohibit the
> > >> selection
> > >> > > of
> > >> > > > > the
> > >> > > > > > > >>  leader
> > >> > > > > > > >>  >> until the partition does not move completely.
> > >> > > > > > > >>  >
> > >> > > > > > > >>  > I guess you are saying that you have deleted the
> tails
> > >> by
> > >> > > hand
> > >> > > > in
> > >> > > > > > > your
> > >> > > > > > > >>  own
> > >> > > > > > > >>  > kafka branch. But KAFKA-1712 is not accepted into
> > Kafka
> > >> > trunk
> > >> > > > > and I
> > >> > > > > > > am
> > >> > > > > > > >>  not
> > >> > > > > > > >>
> > >> > > > > > > >>  No. We just modify segments mtime by cron job. This
> > works
> > >> > with
> > >> > > > > > vanilla
> > >> > > > > > > >>  kafka.
> > >> > > > > > > >>
> > >> > > > > > > >>  > sure if it is the right solution. How would this
> > >> solution
> > >> > > > address
> > >> > > > > > the
> > >> > > > > > > >>  > problem mentioned above?
> > >> > > > > > > >>
> > >> > > > > > > >>  If you need only fresh data and if you remove old data
> > by
> > >> > hands
> > >> > > > > this
> > >> > > > > > is
> > >> > > > > > > >>  not a problem. But in general case
> > >> > > > > > > >>  this is a problem of course.
> > >> > > > > > > >>
> > >> > > > > > > >>  >
> > >> > > > > > > >>  > BTW, I am not sure the solution mentioned in
> > KAFKA-1712
> > >> is
> > >> > > the
> > >> > > > > > right
> > >> > > > > > > way
> > >> > > > > > > >>  to
> > >> > > > > > > >>  > address its problem. Now that we have timestamp in
> the
> > >> > > message
> > >> > > > we
> > >> > > > > > > can use
> > >> > > > > > > >>  > that to delete old segement instead of relying on
> the
> > >> log
> > >> > > > segment
> > >> > > > > > > mtime.
> > >> > > > > > > >>  > Just some idea and we don't have to discuss this
> > problem
> > >> > > here.
> > >> > > > > > > >>  >
> > >> > > > > > > >>  >> >
> > >> > > > > > > >>  >> > The solution presented in the KIP attempts to
> > handle
> > >> it
> > >> > by
> > >> > > > > > > replacing
> > >> > > > > > > >>  >> > replica in an atomic version fashion after the
> log
> > in
> > >> > the
> > >> > > > new
> > >> > > > > > dir
> > >> > > > > > > has
> > >> > > > > > > >>  >> fully
> > >> > > > > > > >>  >> > caught up with the log in the old dir. At at time
> > the
> > >> > log
> > >> > > > can
> > >> > > > > be
> > >> > > > > > > >>  >> considered
> > >> > > > > > > >>  >> > to exist on only one log directory.
> > >> > > > > > > >>  >>
> > >> > > > > > > >>  >> As I understand your solution does not cover
> quotas.
> > >> > > > > > > >>  >> What happens if someone starts to transfer 100
> > >> partitions
> > >> > ?
> > >> > > > > > > >>  >
> > >> > > > > > > >>  > Good point. Quota can be implemented in the future.
> It
> > >> is
> > >> > > > > currently
> > >> > > > > > > >>  > mentioned as as a potential future improvement in
> > >> KIP-112
> > >> > > > > > > >>  > <https://cwiki.apache.org/conf
> > luence/display/KAFKA/KIP-
> > >> > 112%3
> > >> > > > > > > >>  A+Handle+disk+failure+for+JBOD>.Thanks
> > >> > > > > > > >>  > for the reminder. I will move it to KIP-113.
> > >> > > > > > > >>  >
> > >> > > > > > > >>  >> > If yes, it will read a ByteBufferMessageSet from
> > >> > > > > > > topicPartition.log
> > >> > > > > > > >>  and
> > >> > > > > > > >>  >> append the message set to topicPartition.move
> > >> > > > > > > >>  >>
> > >> > > > > > > >>  >> i.e. processPartitionData will read data from the
> > >> > beginning
> > >> > > of
> > >> > > > > > > >>  >> topicPartition.log? What is the read size?
> > >> > > > > > > >>  >> A ReplicaFetchThread reads many partitions so if
> one
> > >> does
> > >> > > some
> > >> > > > > > > >>  complicated
> > >> > > > > > > >>  >> work (= read a lot of data from disk) everything
> will
> > >> slow
> > >> > > > down.
> > >> > > > > > > >>  >> I think read size should not be very big.
> > >> > > > > > > >>  >>
> > >> > > > > > > >>  >> On the other hand at this point
> > (processPartitionData)
> > >> one
> > >> > > can
> > >> > > > > use
> > >> > > > > > > only
> > >> > > > > > > >>  >> the new data (ByteBufferMessageSet from parameters)
> > and
> > >> > wait
> > >> > > > > until
> > >> > > > > > > >>  >> (topicPartition.move.smallestOffset <=
> > >> > > > > > > topicPartition.log.smallestOff
> > >> > > > > > > >>  set
> > >> > > > > > > >>  >> && topicPartition.log.largestOffset ==
> > >> > > > > > > topicPartition.log.largestOffs
> > >> > > > > > > >>  et).
> > >> > > > > > > >>  >> In this case the write speed to topicPartition.move
> > and
> > >> > > > > > > >>  topicPartition.log
> > >> > > > > > > >>  >> will be the same so this will allow us to move many
> > >> > > partitions
> > >> > > > > to
> > >> > > > > > > one
> > >> > > > > > > >>  disk.
> > >> > > > > > > >>  >
> > >> > > > > > > >>  > The read size of a given partition is configured
> > >> > > > > > > >>  > using replica.fetch.max.bytes, which is the same
> size
> > >> used
> > >> > by
> > >> > > > > > > >>  FetchRequest
> > >> > > > > > > >>  > from follower to leader. If the broker is moving a
> > >> replica
> > >> > > for
> > >> > > > > > which
> > >> > > > > > > it
> > >> > > > > > > >>
> > >> > > > > > > >>  OK. Could you mention it in KIP?
> > >> > > > > > > >>
> > >> > > > > > > >>  > acts as a follower, the disk write rate for moving
> > this
> > >> > > replica
> > >> > > > > is
> > >> > > > > > at
> > >> > > > > > > >>  most
> > >> > > > > > > >>  > the rate it fetches from leader (assume it is
> catching
> > >> up
> > >> > and
> > >> > > > has
> > >> > > > > > > >>  > sufficient data to read from leader, which is
> subject
> > to
> > >> > > > > > > round-trip-time
> > >> > > > > > > >>  > between itself and the leader. Thus this part if
> > >> probably
> > >> > > fine
> > >> > > > > even
> > >> > > > > > > >>  without
> > >> > > > > > > >>  > quota.
> > >> > > > > > > >>
> > >> > > > > > > >>  I think there are 2 problems
> > >> > > > > > > >>  1. Without speed limiter this will not work good even
> > for
> > >> 1
> > >> > > > > > partition.
> > >> > > > > > > In
> > >> > > > > > > >>  our production we had a problem so we did the throuput
> > >> > limiter:
> > >> > > > > > > >>  https://github.com/resetius/ka
> > >> fka/commit/cda31dadb2f135743bf
> > >> > > > > > > >>  41083062927886c5ddce1#diff-ffa
> > >> 8861e850121997a534ebdde2929c6R
> > >> > > 713
> > >> > > > > > > >>
> > >> > > > > > > >>  2. I dont understand how it will work in case of big
> > >> > > > > > > >>  replica.fetch.wait.max.ms and partition with
> irregular
> > >> flow.
> > >> > > > > > > >>  For example someone could have
> > replica.fetch.wait.max.ms
> > >> > =10mi
> > >> > > > nutes
> > >> > > > > > and
> > >> > > > > > > >>  partition that has very high data flow from 12:00 to
> > 13:00
> > >> > and
> > >> > > > zero
> > >> > > > > > > flow
> > >> > > > > > > >>  otherwise.
> > >> > > > > > > >>  In this case processPartitionData could be called once
> > per
> > >> > > > > 10minutes
> > >> > > > > > > so if
> > >> > > > > > > >>  we start data moving in 13:01 it will be finished next
> > >> day.
> > >> > > > > > > >>
> > >> > > > > > > >>  >
> > >> > > > > > > >>  > But ff the broker is moving a replica for which it
> > acts
> > >> as
> > >> > a
> > >> > > > > > leader,
> > >> > > > > > > as
> > >> > > > > > > >>  of
> > >> > > > > > > >>  > current KIP the broker will keep reading from
> > >> log_dir_old
> > >> > and
> > >> > > > > > append
> > >> > > > > > > to
> > >> > > > > > > >>  > log_dir_new without having to wait for
> > round-trip-time.
> > >> We
> > >> > > > > probably
> > >> > > > > > > need
> > >> > > > > > > >>  > quota for this in the future.
> > >> > > > > > > >>  >
> > >> > > > > > > >>  >> >
> > >> > > > > > > >>  >> > And to answer your question, yes
> topicpartition.log
> > >> > refers
> > >> > > > to
> > >> > > > > > > >>  >> > topic-paritition/segment.log.
> > >> > > > > > > >>  >> >
> > >> > > > > > > >>  >> > Thanks,
> > >> > > > > > > >>  >> > Dong
> > >> > > > > > > >>  >> >
> > >> > > > > > > >>  >> > On Fri, Jan 13, 2017 at 4:12 AM, Alexey
> Ozeritsky <
> > >> > > > > > > >>  aozerit...@yandex.ru>
> > >> > > > > > > >>  >> > wrote:
> > >> > > > > > > >>  >> >
> > >> > > > > > > >>  >> >> Hi,
> > >> > > > > > > >>  >> >>
> > >> > > > > > > >>  >> >> We have the similar solution that have been
> > working
> > >> in
> > >> > > > > > production
> > >> > > > > > > >>  since
> > >> > > > > > > >>  >> >> 2014. You can see it here:
> > >> > > https://github.com/resetius/ka
> > >> > > > > > > >>  >> >> fka/commit/20658593e246d218490
> > 6879defa2e763c4d413fb
> > >> > > > > > > >>  >> >> The idea is very simple
> > >> > > > > > > >>  >> >> 1. Disk balancer runs in a separate thread
> inside
> > >> > > scheduler
> > >> > > > > > pool.
> > >> > > > > > > >>  >> >> 2. It does not touch empty partitions
> > >> > > > > > > >>  >> >> 3. Before it moves a partition it forcibly
> creates
> > >> new
> > >> > > > > segment
> > >> > > > > > > on a
> > >> > > > > > > >>  >> >> destination disk
> > >> > > > > > > >>  >> >> 4. It moves segment by segment from new to old.
> > >> > > > > > > >>  >> >> 5. Log class works with segments on both disks
> > >> > > > > > > >>  >> >>
> > >> > > > > > > >>  >> >> Your approach seems too complicated, moreover it
> > >> means
> > >> > > that
> > >> > > > > you
> > >> > > > > > > >>  have to
> > >> > > > > > > >>  >> >> patch different components of the system
> > >> > > > > > > >>  >> >> Could you clarify what do you mean by
> > >> > topicPartition.log?
> > >> > > > Is
> > >> > > > > it
> > >> > > > > > > >>  >> >> topic-paritition/segment.log ?
> > >> > > > > > > >>  >> >>
> > >> > > > > > > >>  >> >> 12.01.2017, 21:47, "Dong Lin" <
> > lindon...@gmail.com
> > >> >:
> > >> > > > > > > >>  >> >> > Hi all,
> > >> > > > > > > >>  >> >> >
> > >> > > > > > > >>  >> >> > We created KIP-113: Support replicas movement
> > >> between
> > >> > > log
> > >> > > > > > > >>  >> directories.
> > >> > > > > > > >>  >> >> > Please find the KIP wiki in the link
> > >> > > > > > > >>  >> >> > *https://cwiki.apache.org/conf
> > >> > > > > luence/display/KAFKA/KIP-113%
> > >> > > > > > > >>  >> >> 3A+Support+replicas+movement+b
> > >> etween+log+directories
> > >> > > > > > > >>  >> >> > <https://cwiki.apache.org/conf
> > >> > > > > luence/display/KAFKA/KIP-113%
> > >> > > > > > > >>  >> >> 3A+Support+replicas+movement+
> > >> > between+log+directories>.*
> > >> > > > > > > >>  >> >> >
> > >> > > > > > > >>  >> >> > This KIP is related to KIP-112
> > >> > > > > > > >>  >> >> > <https://cwiki.apache.org/conf
> > >> > > > > luence/display/KAFKA/KIP-112%
> > >> > > > > > > >>  >> >> 3A+Handle+disk+failure+for+JBOD>:
> > >> > > > > > > >>  >> >> > Handle disk failure for JBOD. They are needed
> in
> > >> > order
> > >> > > to
> > >> > > > > > > support
> > >> > > > > > > >>  >> JBOD in
> > >> > > > > > > >>  >> >> > Kafka. Please help review the KIP. You
> feedback
> > is
> > >> > > > > > appreciated!
> > >> > > > > > > >>  >> >> >
> > >> > > > > > > >>  >> >> > Thanks,
> > >> > > > > > > >>  >> >> > Dong
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> > >
> >
>

Reply via email to