Hey Jun,

Thanks for the explanation. Please see below my thoughts.

10. I see. So you are concerned with the potential implementation
complexity which I wasn't aware of. I think it is OK not to do log
cleaning on the .move log since there can be only one such log in each
directory. I have updated the KIP to specify this:

"The log segments in topicPartition.move directory will be subject to log
truncation, log retention in the same way as the log segments in the source
log directory. But we may not do log cleaning on the topicPartition.move to
simplify the implementation."

11.2 Now I get your point. I think we have slightly different expectation
of the order in which the reassignment tools updates reassignment node in
ZK and sends ChangeReplicaDirRequest.

I think the reassignment tool should first create reassignment znode and
then keep sending ChangeReplicaDirRequest until success. I think sending
ChangeReplicaDirRequest before updating znode has negligible impact on the
chance that the broker processes ChangeReplicaDirRequest before
LeaderAndIsrRequest from controller, because the time for controller to
receive ZK notification, handle state machine changes and send
LeaderAndIsrRequests should be much longer than the time for reassignment
tool to setup connection with broker and send ChangeReplicaDirRequest. Even
if broker receives LeaderAndIsrRequest a bit sooner, the data in the
original replica should be smaller enough for .move log to catch up very
quickly, so that broker can swap the log soon after it receives
ChangeReplicaDirRequest -- otherwise the intra.broker.throttled.rate is
probably too small. Does this address your concern with the performance?

One concern with the suggested approach is that the ChangeReplicaDirRequest
may be lost if broker crashes before it creates the replica. I agree it is
rare. But it will be confusing when it happens. Operators would have to
keep verifying reassignment and possibly retry execution until success if
they want to make sure that the ChangeReplicaDirRequest is executed.

Thanks,
Dong



On Tue, Mar 21, 2017 at 8:37 AM, Jun Rao <j...@confluent.io> wrote:

> Hi, Dong,
>
> 10. I was mainly concerned about the additional complexity needed to
> support log cleaning in the .move log. For example, LogToClean is keyed off
> TopicPartition. To be able to support cleaning different instances of the
> same partition, we need additional logic. I am not how much additional
> complexity is needed and whether it's worth it. If we don't do log cleaning
> at all on the .move log, then we don't have to change the log cleaner's
> code.
>
> 11.2 I was thinking of the following flow. In the execute phase, the
> reassignment tool first issues a ChangeReplicaDirRequest to brokers where
> new replicas will be created. The brokers remember the mapping and return a
> successful code. The reassignment tool then initiates the cross broker
> movement through the controller. In the verify phase, in addition to
> checking the replica assignment at the brokers, it issues
> DescribeDirsRequest to check the replica to log dirs mapping. For each
> partition in the response, the broker returns a state to indicate whether
> the replica is final, temporary or pending. If all replicas are in the
> final state, the tool checks if all replicas are in the expected log dirs.
> If they are not, output a warning (and perhaps suggest the users to move
> the data again). However, this should be rare.
>
> Thanks,
>
> Jun
>
>
> On Mon, Mar 20, 2017 at 10:46 AM, Dong Lin <lindon...@gmail.com> wrote:
>
> > Hey Jun,
> >
> > Thanks for the response! It seems that we have only two remaining issues.
> > Please see my reply below.
> >
> > On Mon, Mar 20, 2017 at 7:45 AM, Jun Rao <j...@confluent.io> wrote:
> >
> > > Hi, Dong,
> > >
> > > Thanks for the update. A few replies inlined below.
> > >
> > > On Thu, Mar 16, 2017 at 12:28 AM, Dong Lin <lindon...@gmail.com>
> wrote:
> > >
> > > > Hey Jun,
> > > >
> > > > Thanks for your comment! Please see my reply below.
> > > >
> > > > On Wed, Mar 15, 2017 at 9:45 PM, Jun Rao <j...@confluent.io> wrote:
> > > >
> > > > > Hi, Dong,
> > > > >
> > > > > Thanks for the reply.
> > > > >
> > > > > 10. Could you comment on that?
> > > > >
> > > >
> > > > Sorry, I missed that comment.
> > > >
> > > > Good point. I think the log segments in topicPartition.move directory
> > > will
> > > > be subject to log truncation, log retention and log cleaning in the
> > same
> > > > way as the log segments in the source log directory. I just specified
> > > this
> > > > inthe KIP.
> > > >
> > > >
> > > This is ok, but doubles the overhead of log cleaning. We probably want
> to
> > > think a bit more on this.
> > >
> >
> > I think this is OK because the number of replicas that are being moved is
> > limited by the number of ReplicaMoveThread. The default number of
> > ReplicaMoveThread is the number of log directories, which mean we incur
> > these overhead for at most one replica per log directory at any time.
> > Suppose there are most than 100 replica in any log directory, the
> increase
> > in overhead is less than 1%.
> >
> > Another way to look at this is that this is no worse than replica
> > reassignment. When we reassign replica from one broker to another, we
> will
> > double the overhread of log cleaning in the cluster for this replica. If
> we
> > are OK with this then we are OK with replica movement between log
> > directories.
> >
> >
> > >
> > >
> > > >
> > > > >
> > > > > 11.2 "I am concerned that the ChangeReplicaDirRequest would be lost
> > if
> > > > > broker
> > > > > restarts after it sends ChangeReplicaDirResponse but before it
> > receives
> > > > > LeaderAndIsrRequest."
> > > > >
> > > > > In that case, the reassignment tool could detect that through
> > > > > DescribeDirsRequest
> > > > > and issue ChangeReplicaDirRequest again, right? In the common case,
> > > this
> > > > is
> > > > > probably not needed and we only need to write each replica once.
> > > > >
> > > > > My main concern with the approach in the current KIP is that once a
> > new
> > > > > replica is created in the wrong log dir, the cross log directory
> > > movement
> > > > > may not catch up until the new replica is fully bootstrapped. So,
> we
> > > end
> > > > up
> > > > > writing the data for the same replica twice.
> > > > >
> > > >
> > > > I agree with your concern. My main concern is that it is a bit weird
> if
> > > > ChangeReplicaDirResponse can not guarantee success and the tool needs
> > to
> > > > rely on DescribeDirResponse to see if it needs to send
> > > > ChangeReplicaDirRequest again.
> > > >
> > > > How about this: If broker doesn't not have already replica created
> for
> > > the
> > > > specified topicParition when it receives ChangeReplicaDirRequest, it
> > will
> > > > reply ReplicaNotAvailableException AND remember (replica, destination
> > log
> > > > directory) pair in memory to create the replica in the specified log
> > > > directory.
> > > >
> > > >
> > > I am not sure if returning ReplicaNotAvailableException is useful? What
> > > will the client do on receiving ReplicaNotAvailableException in this
> > case?
> > >
> > > Perhaps we could just replace the is_temporary field in
> > > DescribeDirsRresponsePartition with a state field. We can use 0 to
> > indicate
> > > the partition is created, 1 to indicate the partition is temporary and
> 2
> > to
> > > indicate that the partition is pending.
> > >
> >
> > ReplicaNotAvailableException is useful because the client can re-send
> > ChangeReplicaDirRequest (with backoff) after receiving
> > ReplicaNotAvailableException in the response. ChangeReplicaDirRequest
> will
> > only succeed after replica has been created for the specified partition
> in
> > the broker.
> >
> > I think this is cleaner than asking reassignment tool to detect that
> > through DescribeDirsRequest and issue ChangeReplicaDirRequest again. Both
> > solution has the same chance of writing the data for the same replica
> > twice. In the original solution, the reassignment tool will keep retrying
> > ChangeReplicaDirRequest until success. In the second suggested solution,
> > the reassignment tool needs to send ChangeReplicaDirRequest, send
> > DescribeDirsRequest to verify result, and retry ChangeReplicaDirRequest
> and
> > DescribeDirsRequest again if the replica hasn't been created already.
> Thus
> > the second solution couples ChangeReplicaDirRequest with
> > DescribeDirsRequest and makes tool's logic is bit more complicated.
> >
> > Besides, I am not sure I understand your suggestion for is_temporary
> field.
> > It seems that a replica can have only two states, i.e. normal it is being
> > used to serve fetch/produce requests and temporary if it is a replica is
> > that catching up with the normal one. If you think we should have
> > reassignment tool send DescribeDirsRequest before retrying
> > ChangeReplicaDirRequest, can you elaborate a bit what is the "pending"
> > state?
> >
> >
> > >
> > >
> > > > >
> > > > > 11.3 Are you saying the value in --throttle will be used to set
> both
> > > > > intra.broker.throttled.rate and leader.follower.replication.
> > > > > throttled.replicas?
> > > > >
> > > >
> > > > No. --throttle will be used to only to set
> leader.follower.replication
> > as
> > > > it does now. I think we do not need any option in the
> > > > kafka-reassignment-partitions.sh to specify
> > intra.broker.throttled.rate.
> > > > User canset it in broker config or dynamically using kafka-config.sh.
> > > Does
> > > > this sound OK?
> > > >
> > > >
> > > Ok. This sounds good. It would be useful to make this clear in the
> wiki.
> > >
> > > Sure. I have updated the wiki to specify this: "the quota specified by
> > the
> > argument `–throttle` will be applied to only inter-broker replica
> > reassignment. It does not affect the quota for replica movement between
> log
> > directories".
> >
> >
> > > >
> > > > >
> > > > > 12.2 If the user only wants to check one topic, the tool could do
> the
> > > > > filtering on the client side, right? My concern with having both
> > > log_dirs
> > > > > and topics is the semantic. For example, if both are not empty, do
> we
> > > > > return the intersection or the union?
> > > > >
> > > >
> > > > Yes the tool could filter on the client side. But the purpose of
> having
> > > > this field is to reduce response side in case broker has a lot of
> > topics.
> > > > The both fields are used as filter and the result is intersection. Do
> > you
> > > > think this semantic is confusing or counter-intuitive?
> > >
> > >
> > > >
> > >
> > > Ok. Could we document the semantic when both dirs and topics are
> > specified?
> > >
> >
> > Sure. I have updated the wiki to specify this: "log_dirs and topics are
> > used to filter the results to include only the specified log_dir/topic.
> The
> > result is the intersection of both filters".
> >
> >
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > >
> > > > >
> > > > > On Mon, Mar 13, 2017 at 3:32 PM, Dong Lin <lindon...@gmail.com>
> > wrote:
> > > > >
> > > > > > Hey Jun,
> > > > > >
> > > > > > Thanks much for your detailed comments. Please see my reply
> below.
> > > > > >
> > > > > > On Mon, Mar 13, 2017 at 9:09 AM, Jun Rao <j...@confluent.io>
> wrote:
> > > > > >
> > > > > > > Hi, Dong,
> > > > > > >
> > > > > > > Thanks for the updated KIP. Some more comments below.
> > > > > > >
> > > > > > > 10. For the .move log, do we perform any segment deletion
> (based
> > on
> > > > > > > retention) or log cleaning (if a compacted topic)? Or do we
> only
> > > > enable
> > > > > > > that after the swap?
> > > > > > >
> > > > > > > 11. kafka-reassign-partitions.sh
> > > > > > > 11.1 If all reassigned replicas are in the current broker and
> > only
> > > > the
> > > > > > log
> > > > > > > directories have changed, we can probably optimize the tool to
> > not
> > > > > > trigger
> > > > > > > partition reassignment through the controller and only
> > > > > > > send ChangeReplicaDirRequest.
> > > > > > >
> > > > > >
> > > > > > Yes, the reassignment script should not create the reassignment
> > znode
> > > > if
> > > > > no
> > > > > > replicas are not be moved between brokers. This falls into the
> "How
> > > to
> > > > > move
> > > > > > replica between log directories on the same broker" of the
> Proposed
> > > > > Change
> > > > > > section.
> > > > > >
> > > > > >
> > > > > > > 11.2 If ChangeReplicaDirRequest specifies a replica that's not
> > > > created
> > > > > > yet,
> > > > > > > could the broker just remember that in memory and create the
> > > replica
> > > > > when
> > > > > > > the creation is requested? This way, when doing cluster
> > expansion,
> > > we
> > > > > can
> > > > > > > make sure that the new replicas on the new brokers are created
> in
> > > the
> > > > > > right
> > > > > > > log directory in the first place. We can also avoid the tool
> > having
> > > > to
> > > > > > keep
> > > > > > > issuing ChangeReplicaDirRequest in response to
> > > > > > > ReplicaNotAvailableException.
> > > > > > >
> > > > > >
> > > > > > I am concerned that the ChangeReplicaDirRequest would be lost if
> > > broker
> > > > > > restarts after it sends ChangeReplicaDirResponse but before it
> > > receives
> > > > > > LeaderAndIsrRequest. In this case, the user will receive success
> > when
> > > > > they
> > > > > > initiate replica reassignment, but replica reassignment will
> never
> > > > > complete
> > > > > > when they verify the reassignment later. This would be confusing
> to
> > > > user.
> > > > > >
> > > > > > There are three different approaches to this problem if broker
> has
> > > not
> > > > > > created replica yet after it receives ChangeReplicaDirResquest:
> > > > > >
> > > > > > 1) Broker immediately replies to user with
> > > ReplicaNotAvailableException
> > > > > and
> > > > > > user can decide to retry again later. The advantage of this
> > solution
> > > is
> > > > > > that the broker logic is very simple and the reassignment script
> > > logic
> > > > > also
> > > > > > seems straightforward. The disadvantage is that user script has
> to
> > > > retry.
> > > > > > But it seems fine - we can set interval between retries to be 0.5
> > sec
> > > > so
> > > > > > that broker want be bombarded by those requests. This is the
> > solution
> > > > > > chosen in the current KIP.
> > > > > >
> > > > > > 2) Broker can put ChangeReplicaDirRequest in a purgatory with
> > timeout
> > > > and
> > > > > > replies to user after the replica has been created. I didn't
> choose
> > > > this
> > > > > in
> > > > > > the interest of keeping broker logic simpler.
> > > > > >
> > > > > > 3) Broker can remember that by making a mark in the disk, e.g.
> > create
> > > > > > topicPartition.tomove directory in the destination log directory.
> > > This
> > > > > mark
> > > > > > will be persisted across broker restart. This is the first idea I
> > had
> > > > > but I
> > > > > > replaced it with solution 1) in the interest of keeping broker
> > > simple.
> > > > > >
> > > > > > It seems that solution 1) is the simplest one that works. But I
> am
> > OK
> > > > to
> > > > > > switch to the other two solutions if we don't want the retry
> logic.
> > > > What
> > > > > do
> > > > > > you think?
> > > > > >
> > > > > >
> > > > > > 11.3 Do we need an option in the tool to specify intra.broker.
> > > > > > > throttled.rate?
> > > > > > >
> > > > > >
> > > > > > I don't find it useful to add this option to
> > > > > kafka-reassign-partitions.sh.
> > > > > > The reason we have the option "--throttle" in the script to
> > throttle
> > > > > > replication rate is that we usually want higher quota to fix an
> > > offline
> > > > > > replica to get out of URP. But we are OK to have a lower quota if
> > we
> > > > are
> > > > > > moving replica only to balance the cluster. Thus it is common for
> > SRE
> > > > to
> > > > > > use different quota when using kafka-reassign-partitions.sh to
> move
> > > > > replica
> > > > > > between brokers.
> > > > > >
> > > > > > However, the only reason for moving replica between log
> directories
> > > of
> > > > > the
> > > > > > same broker is to balance cluster resource. Thus the option to
> > > > > > specify intra.broker.throttled.rate in the tool is not that
> > useful. I
> > > > am
> > > > > > inclined not to add this option to keep this tool's usage
> simpler.
> > > > > >
> > > > > >
> > > > > > >
> > > > > > > 12. DescribeDirsRequest
> > > > > > > 12.1 In other requests like CreateTopicRequest, we return an
> > empty
> > > > list
> > > > > > in
> > > > > > > the response for an empty input list. If the input list is
> null,
> > we
> > > > > > return
> > > > > > > everything. We should probably follow the same convention here.
> > > > > > >
> > > > > >
> > > > > > Thanks. I wasn't aware of this convention. I have change
> > > > > > DescribeDirsRequest so that "null" indicates "all".
> > > > > >
> > > > > >
> > > > > > > 12.2 Do we need the topics field? Since the request is about
> log
> > > > dirs,
> > > > > it
> > > > > > > makes sense to specify the log dirs. But it's weird to specify
> > > > topics.
> > > > > > >
> > > > > >
> > > > > > The topics field is not necessary. But it is useful to reduce the
> > > > > response
> > > > > > size in case user are only interested in the status of a few
> > topics.
> > > > For
> > > > > > example, user may have initiated the reassignment of a given
> > replica
> > > > from
> > > > > > one log directory to another log directory on the same broker,
> and
> > > the
> > > > > user
> > > > > > only wants to check the status of this given partition by looking
> > > > > > at DescribeDirsResponse. Thus this field is useful.
> > > > > >
> > > > > > I am not sure if it is weird to call this request
> > > DescribeDirsRequest.
> > > > > The
> > > > > > response is a map from log directory to information to some
> > > partitions
> > > > on
> > > > > > the log directory. Do you think we need to change the name of the
> > > > > request?
> > > > > >
> > > > > >
> > > > > > > 12.3 DescribeDirsResponsePartition: Should we include
> firstOffset
> > > and
> > > > > > > nextOffset in the response? That could be useful to track the
> > > > progress
> > > > > of
> > > > > > > the movement.
> > > > > > >
> > > > > >
> > > > > > Yeah good point. I agree it is useful to include logEndOffset in
> > the
> > > > > > response. According to Log.scala doc the logEndOffset is
> equivalent
> > > to
> > > > > the
> > > > > > nextOffset. User can track progress by checking the difference
> > > between
> > > > > > logEndOffset of the given partition in the source and destination
> > log
> > > > > > directories. I have added logEndOffset to the
> > > > > DescribeDirsResponsePartition
> > > > > > in the KIP.
> > > > > >
> > > > > > But it seems that we don't need firstOffset in the response. Do
> you
> > > > think
> > > > > > firstOffset is still needed?
> > > > > >
> > > > > >
> > > > > > >
> > > > > > > 13. ChangeReplicaDirResponse: Do we need error code at both
> > levels?
> > > > > > >
> > > > > >
> > > > > > My bad. It is not needed. I have removed request level error
> code.
> > I
> > > > also
> > > > > > added ChangeReplicaDirRequestTopic and
> > ChangeReplicaDirResponseTopic
> > > to
> > > > > > reduce duplication of the "topic" string in the request and
> > response.
> > > > > >
> > > > > >
> > > > > > >
> > > > > > > 14. num.replica.move.threads: Does it default to # log dirs?
> > > > > > >
> > > > > >
> > > > > > No. It doesn't. I expect default number to be set to a
> conservative
> > > > value
> > > > > > such as 3. It may be surprising to user if the number of threads
> > > > increase
> > > > > > just because they have assigned more log directories to Kafka
> > broker.
> > > > > >
> > > > > > It seems that the number of replica move threads doesn't have to
> > > depend
> > > > > on
> > > > > > the number of log directories. It is possible to have one thread
> > that
> > > > > moves
> > > > > > replicas across all log directories. On the other hand we can
> have
> > > > > multiple
> > > > > > threads to move replicas to the same log directory. For example,
> if
> > > > > broker
> > > > > > uses SSD, the CPU instead of disk IO may be the replica move
> > > bottleneck
> > > > > and
> > > > > > it will be faster to move replicas using multiple threads per log
> > > > > > directory.
> > > > > >
> > > > > >
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > > > On Thu, Mar 9, 2017 at 7:04 PM, Dong Lin <lindon...@gmail.com>
> > > > wrote:
> > > > > > >
> > > > > > > > I just made one correction in the KIP. If broker receives
> > > > > > > > ChangeReplicaDirRequest and the replica hasn't been created
> > > there,
> > > > > the
> > > > > > > > broker will respond ReplicaNotAvailableException.
> > > > > > > > The kafka-reassignemnt-partitions.sh will need to re-send
> > > > > > > > ChangeReplicaDirRequest in this case in order to wait for
> > > > controller
> > > > > to
> > > > > > > > send LeaderAndIsrRequest to broker. The previous approach of
> > > > creating
> > > > > > an
> > > > > > > > empty directory seems hacky.
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Thu, Mar 9, 2017 at 6:33 PM, Dong Lin <
> lindon...@gmail.com>
> > > > > wrote:
> > > > > > > >
> > > > > > > > > Hey Jun,
> > > > > > > > >
> > > > > > > > > Thanks for your comments! I have updated the KIP to address
> > > your
> > > > > > > > comments.
> > > > > > > > > Please see my reply inline.
> > > > > > > > >
> > > > > > > > > Can you let me know if the latest KIP has addressed your
> > > > comments?
> > > > > > > > >
> > > > > > > > > On Wed, Mar 8, 2017 at 9:56 PM, Jun Rao <j...@confluent.io>
> > > > wrote:
> > > > > > > > >
> > > > > > > > >> Hi, Dong,
> > > > > > > > >>
> > > > > > > > >> Thanks for the reply.
> > > > > > > > >>
> > > > > > > > >> 1.3 So the thread gets the lock, checks if caught up and
> > > > releases
> > > > > > the
> > > > > > > > lock
> > > > > > > > >> if not? Then, in the case when there is continuous
> incoming
> > > > data,
> > > > > > the
> > > > > > > > >> thread may never get a chance to swap. One way to address
> > this
> > > > is
> > > > > > when
> > > > > > > > the
> > > > > > > > >> thread is getting really close in catching up, just hold
> > onto
> > > > the
> > > > > > lock
> > > > > > > > >> until the thread fully catches up.
> > > > > > > > >>
> > > > > > > > >
> > > > > > > > > Yes, that was my original solution. I see your point that
> the
> > > > lock
> > > > > > may
> > > > > > > > not
> > > > > > > > > be fairly assigned to ReplicaMoveThread and
> > > RequestHandlerThread
> > > > > when
> > > > > > > > there
> > > > > > > > > is frequent incoming requets. You solution should address
> the
> > > > > problem
> > > > > > > > and I
> > > > > > > > > have updated the KIP to use it.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >>
> > > > > > > > >> 2.3 So, you are saying that the partition reassignment
> tool
> > > can
> > > > > > first
> > > > > > > > send
> > > > > > > > >> a ChangeReplicaDirRequest to relevant brokers to establish
> > the
> > > > log
> > > > > > dir
> > > > > > > > for
> > > > > > > > >> replicas not created yet, then trigger the partition
> > movement
> > > > > across
> > > > > > > > >> brokers through the controller? That's actually a good
> idea.
> > > > Then,
> > > > > > we
> > > > > > > > can
> > > > > > > > >> just leave LeaderAndIsrRequest as it is.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Yes, that is what I plan to do. If broker receives a
> > > > > > > > > ChangeReplicaDirRequest while it is not leader or follower
> of
> > > the
> > > > > > > > > partition, the broker will create an empty Log instance
> > (i.e. a
> > > > > > > directory
> > > > > > > > > named topicPartition) in the destination log directory so
> > that
> > > > the
> > > > > > > > replica
> > > > > > > > > will be placed there when broker receives
> LeaderAndIsrRequest
> > > > from
> > > > > > the
> > > > > > > > > broker. The broker should clean up empty those Log
> instances
> > on
> > > > > > startup
> > > > > > > > > just in case a ChangeReplicaDirRequest was mistakenly sent
> > to a
> > > > > > broker
> > > > > > > > that
> > > > > > > > > was not meant to be follower/leader of the partition..
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >> Another thing related to
> > > > > > > > >> ChangeReplicaDirRequest.
> > > > > > > > >> Since this request may take long to complete, I am not
> sure
> > if
> > > > we
> > > > > > > should
> > > > > > > > >> wait for the movement to complete before respond. While
> > > waiting
> > > > > for
> > > > > > > the
> > > > > > > > >> movement to complete, the idle connection may be killed or
> > the
> > > > > > client
> > > > > > > > may
> > > > > > > > >> be gone already. An alternative is to return immediately
> and
> > > > add a
> > > > > > new
> > > > > > > > >> request like CheckReplicaDirRequest to see if the movement
> > has
> > > > > > > > completed.
> > > > > > > > >> The tool can take advantage of that to check the status.
> > > > > > > > >>
> > > > > > > > >
> > > > > > > > > I agree with your concern and solution. We need request to
> > > query
> > > > > the
> > > > > > > > > partition -> log_directory mapping on the broker. I have
> > > updated
> > > > > the
> > > > > > > KIP
> > > > > > > > to
> > > > > > > > > remove need for ChangeReplicaDirRequestPurgatory.
> > > > > > > > > Instead, kafka-reassignemnt-partitions.sh will send
> > > > > > > DescribeDirsRequest
> > > > > > > > > to brokers when user wants to verify the partition
> > assignment.
> > > > > Since
> > > > > > we
> > > > > > > > > need this DescribeDirsRequest anyway, we can also use this
> > > > request
> > > > > to
> > > > > > > > > expose stats like the individual log size instead of using
> > JMX.
> > > > One
> > > > > > > > > drawback of using JMX is that user has to manage the JMX
> port
> > > and
> > > > > > > related
> > > > > > > > > credentials if they haven't already done this, which is the
> > > case
> > > > at
> > > > > > > > > LinkedIn.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >> Thanks,
> > > > > > > > >>
> > > > > > > > >> Jun
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >> On Wed, Mar 8, 2017 at 6:21 PM, Dong Lin <
> > lindon...@gmail.com
> > > >
> > > > > > wrote:
> > > > > > > > >>
> > > > > > > > >> > Hey Jun,
> > > > > > > > >> >
> > > > > > > > >> > Thanks for the detailed explanation. I will use the
> > separate
> > > > > > thread
> > > > > > > > >> pool to
> > > > > > > > >> > move replica between log directories. I will let you
> know
> > > when
> > > > > the
> > > > > > > KIP
> > > > > > > > >> has
> > > > > > > > >> > been updated to use a separate thread pool.
> > > > > > > > >> >
> > > > > > > > >> > Here is my response to your other questions:
> > > > > > > > >> >
> > > > > > > > >> > 1.3 My idea is that the ReplicaMoveThread that moves
> data
> > > > should
> > > > > > get
> > > > > > > > the
> > > > > > > > >> > lock before checking whether the replica in the
> > destination
> > > > log
> > > > > > > > >> directory
> > > > > > > > >> > has caught up. If the new replica has caught up, then
> the
> > > > > > > > >> ReplicaMoveThread
> > > > > > > > >> > should swaps the replica while it is still holding the
> > lock.
> > > > The
> > > > > > > > >> > ReplicaFetcherThread or RequestHandlerThread will not be
> > > able
> > > > to
> > > > > > > > append
> > > > > > > > >> > data to the replica in the source replica during this
> > period
> > > > > > because
> > > > > > > > >> they
> > > > > > > > >> > can not get the lock. Does this address the problem?
> > > > > > > > >> >
> > > > > > > > >> > 2.3 I get your point that we want to keep controller
> > > simpler.
> > > > If
> > > > > > > admin
> > > > > > > > >> tool
> > > > > > > > >> > can send ChangeReplicaDirRequest to move data within a
> > > broker,
> > > > > > then
> > > > > > > > >> > controller probably doesn't even need to include log
> > > directory
> > > > > > path
> > > > > > > in
> > > > > > > > >> the
> > > > > > > > >> > LeaderAndIsrRequest. How about this: controller will
> only
> > > deal
> > > > > > with
> > > > > > > > >> > reassignment across brokers as it does now. If user
> > > specified
> > > > > > > > >> destination
> > > > > > > > >> > replica for any disk, the admin tool will send
> > > > > > > ChangeReplicaDirRequest
> > > > > > > > >> and
> > > > > > > > >> > wait for response from broker to confirm that all
> replicas
> > > > have
> > > > > > been
> > > > > > > > >> moved
> > > > > > > > >> > to the destination log direcotry. The broker will put
> > > > > > > > >> > ChangeReplicaDirRequset in a purgatory and respond
> either
> > > when
> > > > > the
> > > > > > > > >> movement
> > > > > > > > >> > is completed or when the request has timed-out.
> > > > > > > > >> >
> > > > > > > > >> > 4. I agree that we can expose these metrics via JMX.
> But I
> > > am
> > > > > not
> > > > > > > sure
> > > > > > > > >> if
> > > > > > > > >> > it can be obtained easily with good performance using
> > either
> > > > > > > existing
> > > > > > > > >> tools
> > > > > > > > >> > or new script in kafka. I will ask SREs for their
> opinion.
> > > > > > > > >> >
> > > > > > > > >> > Thanks,
> > > > > > > > >> > Dong
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > On Wed, Mar 8, 2017 at 1:24 PM, Jun Rao <
> j...@confluent.io
> > >
> > > > > wrote:
> > > > > > > > >> >
> > > > > > > > >> > > Hi, Dong,
> > > > > > > > >> > >
> > > > > > > > >> > > Thanks for the updated KIP. A few more comments below.
> > > > > > > > >> > >
> > > > > > > > >> > > 1.1 and 1.2: I am still not sure there is enough
> benefit
> > > of
> > > > > > > reusing
> > > > > > > > >> > > ReplicaFetchThread
> > > > > > > > >> > > to move data across disks.
> > > > > > > > >> > > (a) A big part of ReplicaFetchThread is to deal with
> > > issuing
> > > > > and
> > > > > > > > >> tracking
> > > > > > > > >> > > fetch requests. So, it doesn't feel that we get much
> > from
> > > > > > reusing
> > > > > > > > >> > > ReplicaFetchThread
> > > > > > > > >> > > only to disable the fetching part.
> > > > > > > > >> > > (b) The leader replica has no ReplicaFetchThread to
> > start
> > > > > with.
> > > > > > It
> > > > > > > > >> feels
> > > > > > > > >> > > weird to start one just for intra broker data
> movement.
> > > > > > > > >> > > (c) The ReplicaFetchThread is per broker. Intuitively,
> > the
> > > > > > number
> > > > > > > of
> > > > > > > > >> > > threads doing intra broker data movement should be
> > related
> > > > to
> > > > > > the
> > > > > > > > >> number
> > > > > > > > >> > of
> > > > > > > > >> > > disks in the broker, not the number of brokers in the
> > > > cluster.
> > > > > > > > >> > > (d) If the destination disk fails, we want to stop the
> > > intra
> > > > > > > broker
> > > > > > > > >> data
> > > > > > > > >> > > movement, but want to continue inter broker
> replication.
> > > So,
> > > > > > > > >> logically,
> > > > > > > > >> > it
> > > > > > > > >> > > seems it's better to separate out the two.
> > > > > > > > >> > > (e) I am also not sure if we should reuse the existing
> > > > > > throttling
> > > > > > > > for
> > > > > > > > >> > > replication. It's designed to handle traffic across
> > > brokers
> > > > > and
> > > > > > > the
> > > > > > > > >> > > delaying is done in the fetch request. So, if we are
> not
> > > > doing
> > > > > > > > >> > > fetching in ReplicaFetchThread,
> > > > > > > > >> > > I am not sure the existing throttling is effective.
> > Also,
> > > > when
> > > > > > > > >> specifying
> > > > > > > > >> > > the throttling of moving data across disks, it seems
> the
> > > > user
> > > > > > > > >> shouldn't
> > > > > > > > >> > > care about whether a replica is a leader or a
> follower.
> > > > > Reusing
> > > > > > > the
> > > > > > > > >> > > existing throttling config name will be awkward in
> this
> > > > > regard.
> > > > > > > > >> > > (f) It seems it's simpler and more consistent to use a
> > > > > separate
> > > > > > > > thread
> > > > > > > > >> > pool
> > > > > > > > >> > > for local data movement (for both leader and follower
> > > > > replicas).
> > > > > > > > This
> > > > > > > > >> > > process can then be configured (e.g. number of
> threads,
> > > etc)
> > > > > and
> > > > > > > > >> > throttled
> > > > > > > > >> > > independently.
> > > > > > > > >> > >
> > > > > > > > >> > > 1.3 Yes, we will need some synchronization there. So,
> if
> > > the
> > > > > > > > movement
> > > > > > > > >> > > thread catches up, gets the lock to do the swap, but
> > > > realizes
> > > > > > that
> > > > > > > > new
> > > > > > > > >> > data
> > > > > > > > >> > > is added, it has to continue catching up while holding
> > the
> > > > > lock?
> > > > > > > > >> > >
> > > > > > > > >> > > 2.3 The benefit of including the desired log directory
> > in
> > > > > > > > >> > > LeaderAndIsrRequest
> > > > > > > > >> > > during partition reassignment is that the controller
> > > doesn't
> > > > > > need
> > > > > > > to
> > > > > > > > >> > track
> > > > > > > > >> > > the progress for disk movement. So, you don't need the
> > > > > > additional
> > > > > > > > >> > > BrokerDirStateUpdateRequest. Then the controller never
> > > needs
> > > > > to
> > > > > > > > issue
> > > > > > > > >> > > ChangeReplicaDirRequest.
> > > > > > > > >> > > Only the admin tool will issue ChangeReplicaDirRequest
> > to
> > > > move
> > > > > > > data
> > > > > > > > >> > within
> > > > > > > > >> > > a broker. I agree that this makes LeaderAndIsrRequest
> > more
> > > > > > > > >> complicated,
> > > > > > > > >> > but
> > > > > > > > >> > > that seems simpler than changing the controller to
> track
> > > > > > > additional
> > > > > > > > >> > states
> > > > > > > > >> > > during partition reassignment.
> > > > > > > > >> > >
> > > > > > > > >> > > 4. We want to make a decision on how to expose the
> > stats.
> > > So
> > > > > > far,
> > > > > > > we
> > > > > > > > >> are
> > > > > > > > >> > > exposing stats like the individual log size as JMX.
> So,
> > > one
> > > > > way
> > > > > > is
> > > > > > > > to
> > > > > > > > >> > just
> > > > > > > > >> > > add new jmx to expose the log directory of individual
> > > > > replicas.
> > > > > > > > >> > >
> > > > > > > > >> > > Thanks,
> > > > > > > > >> > >
> > > > > > > > >> > > Jun
> > > > > > > > >> > >
> > > > > > > > >> > >
> > > > > > > > >> > > On Thu, Mar 2, 2017 at 11:18 PM, Dong Lin <
> > > > > lindon...@gmail.com>
> > > > > > > > >> wrote:
> > > > > > > > >> > >
> > > > > > > > >> > > > Hey Jun,
> > > > > > > > >> > > >
> > > > > > > > >> > > > Thanks for all the comments! Please see my answer
> > > below. I
> > > > > > have
> > > > > > > > >> updated
> > > > > > > > >> > > the
> > > > > > > > >> > > > KIP to address most of the questions and make the
> KIP
> > > > easier
> > > > > > to
> > > > > > > > >> > > understand.
> > > > > > > > >> > > >
> > > > > > > > >> > > > Thanks,
> > > > > > > > >> > > > Dong
> > > > > > > > >> > > >
> > > > > > > > >> > > > On Thu, Mar 2, 2017 at 9:35 AM, Jun Rao <
> > > j...@confluent.io
> > > > >
> > > > > > > wrote:
> > > > > > > > >> > > >
> > > > > > > > >> > > > > Hi, Dong,
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > Thanks for the KIP. A few comments below.
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > 1. For moving data across directories
> > > > > > > > >> > > > > 1.1 I am not sure why we want to use
> > > > ReplicaFetcherThread
> > > > > to
> > > > > > > > move
> > > > > > > > >> > data
> > > > > > > > >> > > > > around in the leader. ReplicaFetchThread fetches
> > data
> > > > from
> > > > > > > > socket.
> > > > > > > > >> > For
> > > > > > > > >> > > > > moving data locally, it seems that we want to
> avoid
> > > the
> > > > > > socket
> > > > > > > > >> > > overhead.
> > > > > > > > >> > > > >
> > > > > > > > >> > > >
> > > > > > > > >> > > > The purpose of using ReplicaFetchThread is to re-use
> > > > > existing
> > > > > > > > thread
> > > > > > > > >> > > > instead of creating more threads and make our thread
> > > model
> > > > > > more
> > > > > > > > >> > complex.
> > > > > > > > >> > > It
> > > > > > > > >> > > > seems like a nature choice for copying data between
> > > disks
> > > > > > since
> > > > > > > it
> > > > > > > > >> is
> > > > > > > > >> > > > similar to copying data between brokers. Another
> > reason
> > > is
> > > > > > that
> > > > > > > if
> > > > > > > > >> the
> > > > > > > > >> > > > replica to be moved is a follower, we don't need
> lock
> > to
> > > > > swap
> > > > > > > > >> replicas
> > > > > > > > >> > > when
> > > > > > > > >> > > > destination replica has caught up, since the same
> > thread
> > > > > which
> > > > > > > is
> > > > > > > > >> > > fetching
> > > > > > > > >> > > > data from leader will swap the replica.
> > > > > > > > >> > > >
> > > > > > > > >> > > > The ReplicaFetchThread will not incur socket
> overhead
> > > > while
> > > > > > > > copying
> > > > > > > > >> > data
> > > > > > > > >> > > > between disks. It will read directly from source
> disk
> > > (as
> > > > we
> > > > > > do
> > > > > > > > when
> > > > > > > > >> > > > processing FetchRequest) and write to destination
> disk
> > > (as
> > > > > we
> > > > > > do
> > > > > > > > >> when
> > > > > > > > >> > > > processing ProduceRequest).
> > > > > > > > >> > > >
> > > > > > > > >> > > >
> > > > > > > > >> > > > > 1.2 I am also not sure about moving data in the
> > > > > > > > >> ReplicaFetcherThread
> > > > > > > > >> > in
> > > > > > > > >> > > > the
> > > > > > > > >> > > > > follower. For example, I am not sure setting
> > > > > > > > >> replica.fetch.max.wait
> > > > > > > > >> > to
> > > > > > > > >> > > 0
> > > > > > > > >> > > > >  is ideal. It may not always be effective since a
> > > fetch
> > > > > > > request
> > > > > > > > in
> > > > > > > > >> > the
> > > > > > > > >> > > > > ReplicaFetcherThread could be arbitrarily delayed
> > due
> > > to
> > > > > > > > >> replication
> > > > > > > > >> > > > > throttling on the leader. In general, the data
> > > movement
> > > > > > logic
> > > > > > > > >> across
> > > > > > > > >> > > > disks
> > > > > > > > >> > > > > seems different from that in ReplicaFetcherThread.
> > > So, I
> > > > > am
> > > > > > > not
> > > > > > > > >> sure
> > > > > > > > >> > > why
> > > > > > > > >> > > > > they need to be coupled.
> > > > > > > > >> > > > >
> > > > > > > > >> > > >
> > > > > > > > >> > > > While it may not be the most efficient way to copy
> > data
> > > > > > between
> > > > > > > > >> local
> > > > > > > > >> > > > disks, it will be at least as efficient as copying
> > data
> > > > from
> > > > > > > > leader
> > > > > > > > >> to
> > > > > > > > >> > > the
> > > > > > > > >> > > > destination disk. The expected goal of KIP-113 is to
> > > > enable
> > > > > > data
> > > > > > > > >> > movement
> > > > > > > > >> > > > between disks with no less efficiency than what we
> do
> > > now
> > > > > when
> > > > > > > > >> moving
> > > > > > > > >> > > data
> > > > > > > > >> > > > between brokers. I think we can optimize its
> > performance
> > > > > using
> > > > > > > > >> separate
> > > > > > > > >> > > > thread if the performance is not good enough.
> > > > > > > > >> > > >
> > > > > > > > >> > > >
> > > > > > > > >> > > > > 1.3 Could you add a bit more details on how we
> swap
> > > the
> > > > > > > replicas
> > > > > > > > >> when
> > > > > > > > >> > > the
> > > > > > > > >> > > > > new ones are fully caught up? For example, what
> > > happens
> > > > > when
> > > > > > > the
> > > > > > > > >> new
> > > > > > > > >> > > > > replica in the new log directory is caught up, but
> > > when
> > > > we
> > > > > > > want
> > > > > > > > >> to do
> > > > > > > > >> > > the
> > > > > > > > >> > > > > swap, some new data has arrived?
> > > > > > > > >> > > > >
> > > > > > > > >> > > >
> > > > > > > > >> > > > If the replica is a leader, then
> ReplicaFetcherThread
> > > will
> > > > > > > perform
> > > > > > > > >> the
> > > > > > > > >> > > > replacement. Proper lock is needed to prevent
> > > > > > > KafkaRequestHandler
> > > > > > > > >> from
> > > > > > > > >> > > > appending data to the topicPartition.log on the
> source
> > > > disks
> > > > > > > > before
> > > > > > > > >> > this
> > > > > > > > >> > > > replacement is completed by ReplicaFetcherThread.
> > > > > > > > >> > > >
> > > > > > > > >> > > > If the replica is a follower, because the same
> > > > > > > ReplicaFetchThread
> > > > > > > > >> which
> > > > > > > > >> > > > fetches data from leader will also swap the replica
> ,
> > no
> > > > > lock
> > > > > > is
> > > > > > > > >> > needed.
> > > > > > > > >> > > >
> > > > > > > > >> > > > I have updated the KIP to specify both more
> > explicitly.
> > > > > > > > >> > > >
> > > > > > > > >> > > >
> > > > > > > > >> > > >
> > > > > > > > >> > > > > 1.4 Do we need to do the .move at the log segment
> > > level
> > > > or
> > > > > > > could
> > > > > > > > >> we
> > > > > > > > >> > > just
> > > > > > > > >> > > > do
> > > > > > > > >> > > > > that at the replica directory level? Renaming
> just a
> > > > > > directory
> > > > > > > > is
> > > > > > > > >> > much
> > > > > > > > >> > > > > faster than renaming the log segments.
> > > > > > > > >> > > > >
> > > > > > > > >> > > >
> > > > > > > > >> > > > Great point. I have updated the KIP to rename the
> log
> > > > > > directory
> > > > > > > > >> > instead.
> > > > > > > > >> > > >
> > > > > > > > >> > > >
> > > > > > > > >> > > > > 1.5 Could you also describe a bit what happens
> when
> > > > either
> > > > > > the
> > > > > > > > >> source
> > > > > > > > >> > > or
> > > > > > > > >> > > > > the target log directory fails while the data
> moving
> > > is
> > > > in
> > > > > > > > >> progress?
> > > > > > > > >> > > > >
> > > > > > > > >> > > >
> > > > > > > > >> > > > If source log directory fails, then the replica
> > movement
> > > > > will
> > > > > > > stop
> > > > > > > > >> and
> > > > > > > > >> > > the
> > > > > > > > >> > > > source replica is marked offline. If destination log
> > > > > directory
> > > > > > > > >> fails,
> > > > > > > > >> > > then
> > > > > > > > >> > > > the replica movement will stop. I have updated the
> KIP
> > > to
> > > > > > > clarify
> > > > > > > > >> this.
> > > > > > > > >> > > >
> > > > > > > > >> > > >
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > 2. For partition reassignment.
> > > > > > > > >> > > > > 2.1 I am not sure if the controller can block on
> > > > > > > > >> > > ChangeReplicaDirRequest.
> > > > > > > > >> > > > > Data movement may take a long time to complete. If
> > > there
> > > > > is
> > > > > > an
> > > > > > > > >> > > > outstanding
> > > > > > > > >> > > > > request from the controller to a broker, that
> broker
> > > > won't
> > > > > > be
> > > > > > > > >> able to
> > > > > > > > >> > > > > process any new request from the controller. So if
> > > > another
> > > > > > > event
> > > > > > > > >> > (e.g.
> > > > > > > > >> > > > > broker failure) happens when the data movement is
> in
> > > > > > progress,
> > > > > > > > >> > > subsequent
> > > > > > > > >> > > > > LeaderAnIsrRequest will be delayed.
> > > > > > > > >> > > > >
> > > > > > > > >> > > >
> > > > > > > > >> > > > Yeah good point. I missed the fact that there is be
> > only
> > > > one
> > > > > > > > >> inflight
> > > > > > > > >> > > > request from controller to broker.
> > > > > > > > >> > > >
> > > > > > > > >> > > > How about I add a request, e.g.
> > > > BrokerDirStateUpdateRequest,
> > > > > > > which
> > > > > > > > >> maps
> > > > > > > > >> > > > topicPartition to log directory and can be sent from
> > > > broker
> > > > > to
> > > > > > > > >> > controller
> > > > > > > > >> > > > to indicate completion?
> > > > > > > > >> > > >
> > > > > > > > >> > > >
> > > > > > > > >> > > >
> > > > > > > > >> > > > > 2.2 in the KIP, the partition reassignment tool is
> > > also
> > > > > used
> > > > > > > for
> > > > > > > > >> > cases
> > > > > > > > >> > > > > where an admin just wants to balance the existing
> > data
> > > > > > across
> > > > > > > > log
> > > > > > > > >> > > > > directories in the broker. In this case, it seems
> > that
> > > > > it's
> > > > > > > over
> > > > > > > > >> > > killing
> > > > > > > > >> > > > to
> > > > > > > > >> > > > > have the process go through the controller. A
> > simpler
> > > > > > approach
> > > > > > > > is
> > > > > > > > >> to
> > > > > > > > >> > > > issue
> > > > > > > > >> > > > > an RPC request to the broker directly.
> > > > > > > > >> > > > >
> > > > > > > > >> > > >
> > > > > > > > >> > > > I agree we can optimize this case. It is just that
> we
> > > have
> > > > > to
> > > > > > > add
> > > > > > > > >> new
> > > > > > > > >> > > logic
> > > > > > > > >> > > > or code path to handle a scenario that is already
> > > covered
> > > > by
> > > > > > the
> > > > > > > > >> more
> > > > > > > > >> > > > complicated scenario. I will add it to the KIP.
> > > > > > > > >> > > >
> > > > > > > > >> > > >
> > > > > > > > >> > > > > 2.3 When using the partition reassignment tool to
> > move
> > > > > > > replicas
> > > > > > > > >> > across
> > > > > > > > >> > > > > brokers, it make sense to be able to specify the
> log
> > > > > > directory
> > > > > > > > of
> > > > > > > > >> the
> > > > > > > > >> > > > newly
> > > > > > > > >> > > > > created replicas. The KIP does that in two
> separate
> > > > > requests
> > > > > > > > >> > > > > ChangeReplicaDirRequest and LeaderAndIsrRequest,
> and
> > > > > tracks
> > > > > > > the
> > > > > > > > >> > > progress
> > > > > > > > >> > > > of
> > > > > > > > >> > > > > each independently. An alternative is to do that
> > just
> > > in
> > > > > > > > >> > > > > LeaderAndIsrRequest.
> > > > > > > > >> > > > > That way, the new replicas will be created in the
> > > right
> > > > > log
> > > > > > > dir
> > > > > > > > in
> > > > > > > > >> > the
> > > > > > > > >> > > > > first place and the controller just needs to track
> > the
> > > > > > > progress
> > > > > > > > of
> > > > > > > > >> > > > > partition reassignment in the current way.
> > > > > > > > >> > > > >
> > > > > > > > >> > > >
> > > > > > > > >> > > > I agree it is better to use one request instead of
> two
> > > to
> > > > > > > request
> > > > > > > > >> > replica
> > > > > > > > >> > > > movement between disks. But I think the performance
> > > > > advantage
> > > > > > of
> > > > > > > > >> doing
> > > > > > > > >> > so
> > > > > > > > >> > > > is negligible because we trigger replica assignment
> > much
> > > > > less
> > > > > > > than
> > > > > > > > >> all
> > > > > > > > >> > > > other kinds of events in the Kafka cluster. I am not
> > > sure
> > > > > that
> > > > > > > the
> > > > > > > > >> > > benefit
> > > > > > > > >> > > > of doing this is worth the effort to add an optional
> > > > string
> > > > > > > field
> > > > > > > > in
> > > > > > > > >> > the
> > > > > > > > >> > > > LeaderAndIsrRequest. Also if we add this optional
> > field
> > > in
> > > > > the
> > > > > > > > >> > > > LeaderAndIsrRequest, we probably want to remove
> > > > > > > > >> ChangeReplicaDirRequest
> > > > > > > > >> > > to
> > > > > > > > >> > > > avoid having two requests doing the same thing. But
> it
> > > > means
> > > > > > > user
> > > > > > > > >> > script
> > > > > > > > >> > > > can not send request directly to the broker to
> trigger
> > > > > replica
> > > > > > > > >> movement
> > > > > > > > >> > > > between log directories.
> > > > > > > > >> > > >
> > > > > > > > >> > > > I will do it if you are strong about this
> optimzation.
> > > > > > > > >> > > >
> > > > > > > > >> > > >
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > 3. /admin/reassign_partitions: Including the log
> dir
> > > in
> > > > > > every
> > > > > > > > >> replica
> > > > > > > > >> > > may
> > > > > > > > >> > > > > not be efficient. We could include a list of log
> > > > > directories
> > > > > > > and
> > > > > > > > >> > > > reference
> > > > > > > > >> > > > > the index of the log directory in each replica.
> > > > > > > > >> > > > >
> > > > > > > > >> > > >
> > > > > > > > >> > > > Good point. I have updated the KIP to use this
> > solution.
> > > > > > > > >> > > >
> > > > > > > > >> > > >
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > 4. DescribeDirsRequest: The stats in the request
> are
> > > > > already
> > > > > > > > >> > available
> > > > > > > > >> > > > from
> > > > > > > > >> > > > > JMX. Do we need the new request?
> > > > > > > > >> > > > >
> > > > > > > > >> > > >
> > > > > > > > >> > > > Does JMX also include the state (i.e. offline or
> > online)
> > > > of
> > > > > > each
> > > > > > > > log
> > > > > > > > >> > > > directory and the log directory of each replica? If
> > not,
> > > > > then
> > > > > > > > maybe
> > > > > > > > >> we
> > > > > > > > >> > > > still need DescribeDirsRequest?
> > > > > > > > >> > > >
> > > > > > > > >> > > >
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > 5. We want to be consistent on
> > ChangeReplicaDirRequest
> > > > vs
> > > > > > > > >> > > > > ChangeReplicaRequest.
> > > > > > > > >> > > > >
> > > > > > > > >> > > >
> > > > > > > > >> > > > I think ChangeReplicaRequest and
> ChangeReplicaResponse
> > > is
> > > > my
> > > > > > > typo.
> > > > > > > > >> > Sorry,
> > > > > > > > >> > > > they are fixed now.
> > > > > > > > >> > > >
> > > > > > > > >> > > >
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > Thanks,
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > Jun
> > > > > > > > >> > > > >
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > On Fri, Feb 3, 2017 at 6:19 PM, Dong Lin <
> > > > > > lindon...@gmail.com
> > > > > > > >
> > > > > > > > >> > wrote:
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > > Hey ALexey,
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > Thanks for all the comments!
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > I have updated the KIP to specify how we enforce
> > > > quota.
> > > > > I
> > > > > > > also
> > > > > > > > >> > > updated
> > > > > > > > >> > > > > the
> > > > > > > > >> > > > > > "The thread model and broker logic for moving
> > > replica
> > > > > data
> > > > > > > > >> between
> > > > > > > > >> > > log
> > > > > > > > >> > > > > > directories" to make it easier to read. You can
> > find
> > > > the
> > > > > > > exact
> > > > > > > > >> > change
> > > > > > > > >> > > > > here
> > > > > > > > >> > > > > > <https://cwiki.apache.org/conf
> > > > > > > luence/pages/diffpagesbyversio
> > > > > > > > >> > > > > > n.action?pageId=67638408&selec
> > > > > > > tedPageVersions=5&selectedPage
> > > > > > > > >> > > > Versions=6>.
> > > > > > > > >> > > > > > The idea is to use the same replication quota
> > > > mechanism
> > > > > > > > >> introduced
> > > > > > > > >> > in
> > > > > > > > >> > > > > > KIP-73.
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > Thanks,
> > > > > > > > >> > > > > > Dong
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > On Wed, Feb 1, 2017 at 2:16 AM, Alexey
> Ozeritsky <
> > > > > > > > >> > > aozerit...@yandex.ru
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > > wrote:
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > > > 24.01.2017, 22:03, "Dong Lin" <
> > > lindon...@gmail.com
> > > > >:
> > > > > > > > >> > > > > > > > Hey Alexey,
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > > Thanks. I think we agreed that the suggested
> > > > > solution
> > > > > > > > >> doesn't
> > > > > > > > >> > > work
> > > > > > > > >> > > > in
> > > > > > > > >> > > > > > > > general for kafka users. To answer your
> > > questions:
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > > 1. I agree we need quota to rate limit
> replica
> > > > > > movement
> > > > > > > > >> when a
> > > > > > > > >> > > > broker
> > > > > > > > >> > > > > > is
> > > > > > > > >> > > > > > > > moving a "leader" replica. I will come up
> with
> > > > > > solution,
> > > > > > > > >> > probably
> > > > > > > > >> > > > > > re-use
> > > > > > > > >> > > > > > > > the config of replication quota introduced
> in
> > > > > KIP-73.
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > > 2. Good point. I agree that this is a
> problem
> > in
> > > > > > > general.
> > > > > > > > >> If is
> > > > > > > > >> > > no
> > > > > > > > >> > > > > new
> > > > > > > > >> > > > > > > data
> > > > > > > > >> > > > > > > > on that broker, with current default value
> of
> > > > > > > > >> > > > > > replica.fetch.wait.max.ms
> > > > > > > > >> > > > > > > > and replica.fetch.max.bytes, the replica
> will
> > be
> > > > > moved
> > > > > > > at
> > > > > > > > >> only
> > > > > > > > >> > 2
> > > > > > > > >> > > > MBps
> > > > > > > > >> > > > > > > > throughput. I think the solution is for
> broker
> > > to
> > > > > set
> > > > > > > > >> > > > > > > > replica.fetch.wait.max.ms to 0 in its
> > > > FetchRequest
> > > > > if
> > > > > > > the
> > > > > > > > >> > > > > > corresponding
> > > > > > > > >> > > > > > > > ReplicaFetcherThread needs to move some
> > replica
> > > to
> > > > > > > another
> > > > > > > > >> > disk.
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > > 3. I have updated the KIP to mention that
> the
> > > read
> > > > > > size
> > > > > > > > of a
> > > > > > > > >> > > given
> > > > > > > > >> > > > > > > > partition is configured using
> > > > > replica.fetch.max.bytes
> > > > > > > when
> > > > > > > > >> we
> > > > > > > > >> > > move
> > > > > > > > >> > > > > > > replicas
> > > > > > > > >> > > > > > > > between disks.
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > > Please see this
> > > > > > > > >> > > > > > > > <https://cwiki.apache.org/conf
> > > > > > > > >> luence/pages/diffpagesbyversio
> > > > > > > > >> > > > n.action
> > > > > > > > >> > > > > ?
> > > > > > > > >> > > > > > > pageId=67638408&selectedPageVe
> > > > > > > > rsions=4&selectedPageVersions=
> > > > > > > > >> 5>
> > > > > > > > >> > > > > > > > for the change of the KIP. I will come up
> > with a
> > > > > > > solution
> > > > > > > > to
> > > > > > > > >> > > > throttle
> > > > > > > > >> > > > > > > > replica movement when a broker is moving a
> > > > "leader"
> > > > > > > > replica.
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > > > Thanks. It looks great.
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > > On Tue, Jan 24, 2017 at 3:30 AM, Alexey
> > > Ozeritsky
> > > > <
> > > > > > > > >> > > > > > aozerit...@yandex.ru>
> > > > > > > > >> > > > > > > > wrote:
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > >>  23.01.2017, 22:11, "Dong Lin" <
> > > > > lindon...@gmail.com
> > > > > > >:
> > > > > > > > >> > > > > > > >>  > Thanks. Please see my comment inline.
> > > > > > > > >> > > > > > > >>  >
> > > > > > > > >> > > > > > > >>  > On Mon, Jan 23, 2017 at 6:45 AM, Alexey
> > > > > Ozeritsky
> > > > > > <
> > > > > > > > >> > > > > > > aozerit...@yandex.ru>
> > > > > > > > >> > > > > > > >>  > wrote:
> > > > > > > > >> > > > > > > >>  >
> > > > > > > > >> > > > > > > >>  >> 13.01.2017, 22:29, "Dong Lin" <
> > > > > > lindon...@gmail.com
> > > > > > > >:
> > > > > > > > >> > > > > > > >>  >> > Hey Alexey,
> > > > > > > > >> > > > > > > >>  >> >
> > > > > > > > >> > > > > > > >>  >> > Thanks for your review and the
> > > alternative
> > > > > > > > approach.
> > > > > > > > >> > Here
> > > > > > > > >> > > is
> > > > > > > > >> > > > > my
> > > > > > > > >> > > > > > > >>  >> > understanding of your patch. kafka's
> > > > > background
> > > > > > > > >> threads
> > > > > > > > >> > > are
> > > > > > > > >> > > > > used
> > > > > > > > >> > > > > > > to
> > > > > > > > >> > > > > > > >>  move
> > > > > > > > >> > > > > > > >>  >> > data between replicas. When data
> > movement
> > > > is
> > > > > > > > >> triggered,
> > > > > > > > >> > > the
> > > > > > > > >> > > > > log
> > > > > > > > >> > > > > > > will
> > > > > > > > >> > > > > > > >>  be
> > > > > > > > >> > > > > > > >>  >> > rolled and the new logs will be put
> in
> > > the
> > > > > new
> > > > > > > > >> > directory,
> > > > > > > > >> > > > and
> > > > > > > > >> > > > > > > >>  background
> > > > > > > > >> > > > > > > >>  >> > threads will move segment from old
> > > > directory
> > > > > to
> > > > > > > new
> > > > > > > > >> > > > directory.
> > > > > > > > >> > > > > > > >>  >> >
> > > > > > > > >> > > > > > > >>  >> > It is important to note that KIP-112
> is
> > > > > > intended
> > > > > > > to
> > > > > > > > >> work
> > > > > > > > >> > > > with
> > > > > > > > >> > > > > > > >>  KIP-113 to
> > > > > > > > >> > > > > > > >>  >> > support JBOD. I think your solution
> is
> > > > > > definitely
> > > > > > > > >> > simpler
> > > > > > > > >> > > > and
> > > > > > > > >> > > > > > > better
> > > > > > > > >> > > > > > > >>  >> under
> > > > > > > > >> > > > > > > >>  >> > the current kafka implementation
> that a
> > > > > broker
> > > > > > > will
> > > > > > > > >> fail
> > > > > > > > >> > > if
> > > > > > > > >> > > > > any
> > > > > > > > >> > > > > > > disk
> > > > > > > > >> > > > > > > >>  >> fails.
> > > > > > > > >> > > > > > > >>  >> > But I am not sure if we want to allow
> > > > broker
> > > > > to
> > > > > > > run
> > > > > > > > >> with
> > > > > > > > >> > > > > partial
> > > > > > > > >> > > > > > > >>  disks
> > > > > > > > >> > > > > > > >>  >> > failure. Let's say the a replica is
> > being
> > > > > moved
> > > > > > > > from
> > > > > > > > >> > > > > log_dir_old
> > > > > > > > >> > > > > > > to
> > > > > > > > >> > > > > > > >>  >> > log_dir_new and then log_dir_old
> stops
> > > > > working
> > > > > > > due
> > > > > > > > to
> > > > > > > > >> > disk
> > > > > > > > >> > > > > > > failure.
> > > > > > > > >> > > > > > > >>  How
> > > > > > > > >> > > > > > > >>  >> > would your existing patch handles it?
> > To
> > > > make
> > > > > > the
> > > > > > > > >> > > scenario a
> > > > > > > > >> > > > > bit
> > > > > > > > >> > > > > > > more
> > > > > > > > >> > > > > > > >>  >>
> > > > > > > > >> > > > > > > >>  >> We will lose log_dir_old. After broker
> > > > restart
> > > > > we
> > > > > > > can
> > > > > > > > >> read
> > > > > > > > >> > > the
> > > > > > > > >> > > > > > data
> > > > > > > > >> > > > > > > >>  from
> > > > > > > > >> > > > > > > >>  >> log_dir_new.
> > > > > > > > >> > > > > > > >>  >
> > > > > > > > >> > > > > > > >>  > No, you probably can't. This is because
> > the
> > > > > broker
> > > > > > > > >> doesn't
> > > > > > > > >> > > have
> > > > > > > > >> > > > > > > *all* the
> > > > > > > > >> > > > > > > >>  > data for this partition. For example,
> say
> > > the
> > > > > > broker
> > > > > > > > has
> > > > > > > > >> > > > > > > >>  > partition_segement_1,
> partition_segment_50
> > > and
> > > > > > > > >> > > > > > partition_segment_100
> > > > > > > > >> > > > > > > on
> > > > > > > > >> > > > > > > >>  the
> > > > > > > > >> > > > > > > >>  > log_dir_old. partition_segment_100,
> which
> > > has
> > > > > the
> > > > > > > > latest
> > > > > > > > >> > > data,
> > > > > > > > >> > > > > has
> > > > > > > > >> > > > > > > been
> > > > > > > > >> > > > > > > >>  > moved to log_dir_new, and the
> log_dir_old
> > > > fails
> > > > > > > before
> > > > > > > > >> > > > > > > >>  partition_segment_50
> > > > > > > > >> > > > > > > >>  > and partition_segment_1 is moved to
> > > > log_dir_new.
> > > > > > > When
> > > > > > > > >> > broker
> > > > > > > > >> > > > > > > re-starts,
> > > > > > > > >> > > > > > > >>  it
> > > > > > > > >> > > > > > > >>  > won't have partition_segment_50. This
> > causes
> > > > > > problem
> > > > > > > > if
> > > > > > > > >> > > broker
> > > > > > > > >> > > > is
> > > > > > > > >> > > > > > > elected
> > > > > > > > >> > > > > > > >>  > leader and consumer wants to consume
> data
> > in
> > > > the
> > > > > > > > >> > > > > > partition_segment_1.
> > > > > > > > >> > > > > > > >>
> > > > > > > > >> > > > > > > >>  Right.
> > > > > > > > >> > > > > > > >>
> > > > > > > > >> > > > > > > >>  >
> > > > > > > > >> > > > > > > >>  >> > complicated, let's say the broker is
> > > > > shtudown,
> > > > > > > > >> > > log_dir_old's
> > > > > > > > >> > > > > > disk
> > > > > > > > >> > > > > > > >>  fails,
> > > > > > > > >> > > > > > > >>  >> > and the broker starts. In this case
> > > broker
> > > > > > > doesn't
> > > > > > > > >> even
> > > > > > > > >> > > know
> > > > > > > > >> > > > > if
> > > > > > > > >> > > > > > > >>  >> log_dir_new
> > > > > > > > >> > > > > > > >>  >> > has all the data needed for this
> > replica.
> > > > It
> > > > > > > > becomes
> > > > > > > > >> a
> > > > > > > > >> > > > problem
> > > > > > > > >> > > > > > if
> > > > > > > > >> > > > > > > the
> > > > > > > > >> > > > > > > >>  >> > broker is elected leader of this
> > > partition
> > > > in
> > > > > > > this
> > > > > > > > >> case.
> > > > > > > > >> > > > > > > >>  >>
> > > > > > > > >> > > > > > > >>  >> log_dir_new contains the most recent
> data
> > > so
> > > > we
> > > > > > > will
> > > > > > > > >> lose
> > > > > > > > >> > > the
> > > > > > > > >> > > > > tail
> > > > > > > > >> > > > > > > of
> > > > > > > > >> > > > > > > >>  >> partition.
> > > > > > > > >> > > > > > > >>  >> This is not a big problem for us
> because
> > we
> > > > > > already
> > > > > > > > >> delete
> > > > > > > > >> > > > tails
> > > > > > > > >> > > > > > by
> > > > > > > > >> > > > > > > >>  hand
> > > > > > > > >> > > > > > > >>  >> (see https://issues.apache.org/jira
> > > > > > > > /browse/KAFKA-1712
> > > > > > > > >> ).
> > > > > > > > >> > > > > > > >>  >> Also we dont use authomatic leader
> > > balancing
> > > > > > > > >> > > > > > > >>  (auto.leader.rebalance.enable=false),
> > > > > > > > >> > > > > > > >>  >> so this partition becomes the leader
> > with a
> > > > low
> > > > > > > > >> > probability.
> > > > > > > > >> > > > > > > >>  >> I think my patch can be modified to
> > > prohibit
> > > > > the
> > > > > > > > >> selection
> > > > > > > > >> > > of
> > > > > > > > >> > > > > the
> > > > > > > > >> > > > > > > >>  leader
> > > > > > > > >> > > > > > > >>  >> until the partition does not move
> > > completely.
> > > > > > > > >> > > > > > > >>  >
> > > > > > > > >> > > > > > > >>  > I guess you are saying that you have
> > deleted
> > > > the
> > > > > > > tails
> > > > > > > > >> by
> > > > > > > > >> > > hand
> > > > > > > > >> > > > in
> > > > > > > > >> > > > > > > your
> > > > > > > > >> > > > > > > >>  own
> > > > > > > > >> > > > > > > >>  > kafka branch. But KAFKA-1712 is not
> > accepted
> > > > > into
> > > > > > > > Kafka
> > > > > > > > >> > trunk
> > > > > > > > >> > > > > and I
> > > > > > > > >> > > > > > > am
> > > > > > > > >> > > > > > > >>  not
> > > > > > > > >> > > > > > > >>
> > > > > > > > >> > > > > > > >>  No. We just modify segments mtime by cron
> > job.
> > > > > This
> > > > > > > > works
> > > > > > > > >> > with
> > > > > > > > >> > > > > > vanilla
> > > > > > > > >> > > > > > > >>  kafka.
> > > > > > > > >> > > > > > > >>
> > > > > > > > >> > > > > > > >>  > sure if it is the right solution. How
> > would
> > > > this
> > > > > > > > >> solution
> > > > > > > > >> > > > address
> > > > > > > > >> > > > > > the
> > > > > > > > >> > > > > > > >>  > problem mentioned above?
> > > > > > > > >> > > > > > > >>
> > > > > > > > >> > > > > > > >>  If you need only fresh data and if you
> > remove
> > > > old
> > > > > > data
> > > > > > > > by
> > > > > > > > >> > hands
> > > > > > > > >> > > > > this
> > > > > > > > >> > > > > > is
> > > > > > > > >> > > > > > > >>  not a problem. But in general case
> > > > > > > > >> > > > > > > >>  this is a problem of course.
> > > > > > > > >> > > > > > > >>
> > > > > > > > >> > > > > > > >>  >
> > > > > > > > >> > > > > > > >>  > BTW, I am not sure the solution
> mentioned
> > in
> > > > > > > > KAFKA-1712
> > > > > > > > >> is
> > > > > > > > >> > > the
> > > > > > > > >> > > > > > right
> > > > > > > > >> > > > > > > way
> > > > > > > > >> > > > > > > >>  to
> > > > > > > > >> > > > > > > >>  > address its problem. Now that we have
> > > > timestamp
> > > > > in
> > > > > > > the
> > > > > > > > >> > > message
> > > > > > > > >> > > > we
> > > > > > > > >> > > > > > > can use
> > > > > > > > >> > > > > > > >>  > that to delete old segement instead of
> > > relying
> > > > > on
> > > > > > > the
> > > > > > > > >> log
> > > > > > > > >> > > > segment
> > > > > > > > >> > > > > > > mtime.
> > > > > > > > >> > > > > > > >>  > Just some idea and we don't have to
> > discuss
> > > > this
> > > > > > > > problem
> > > > > > > > >> > > here.
> > > > > > > > >> > > > > > > >>  >
> > > > > > > > >> > > > > > > >>  >> >
> > > > > > > > >> > > > > > > >>  >> > The solution presented in the KIP
> > > attempts
> > > > to
> > > > > > > > handle
> > > > > > > > >> it
> > > > > > > > >> > by
> > > > > > > > >> > > > > > > replacing
> > > > > > > > >> > > > > > > >>  >> > replica in an atomic version fashion
> > > after
> > > > > the
> > > > > > > log
> > > > > > > > in
> > > > > > > > >> > the
> > > > > > > > >> > > > new
> > > > > > > > >> > > > > > dir
> > > > > > > > >> > > > > > > has
> > > > > > > > >> > > > > > > >>  >> fully
> > > > > > > > >> > > > > > > >>  >> > caught up with the log in the old
> dir.
> > At
> > > > at
> > > > > > time
> > > > > > > > the
> > > > > > > > >> > log
> > > > > > > > >> > > > can
> > > > > > > > >> > > > > be
> > > > > > > > >> > > > > > > >>  >> considered
> > > > > > > > >> > > > > > > >>  >> > to exist on only one log directory.
> > > > > > > > >> > > > > > > >>  >>
> > > > > > > > >> > > > > > > >>  >> As I understand your solution does not
> > > cover
> > > > > > > quotas.
> > > > > > > > >> > > > > > > >>  >> What happens if someone starts to
> > transfer
> > > > 100
> > > > > > > > >> partitions
> > > > > > > > >> > ?
> > > > > > > > >> > > > > > > >>  >
> > > > > > > > >> > > > > > > >>  > Good point. Quota can be implemented in
> > the
> > > > > > future.
> > > > > > > It
> > > > > > > > >> is
> > > > > > > > >> > > > > currently
> > > > > > > > >> > > > > > > >>  > mentioned as as a potential future
> > > improvement
> > > > > in
> > > > > > > > >> KIP-112
> > > > > > > > >> > > > > > > >>  > <https://cwiki.apache.org/conf
> > > > > > > > luence/display/KAFKA/KIP-
> > > > > > > > >> > 112%3
> > > > > > > > >> > > > > > > >>  A+Handle+disk+failure+for+JBOD>.Thanks
> > > > > > > > >> > > > > > > >>  > for the reminder. I will move it to
> > KIP-113.
> > > > > > > > >> > > > > > > >>  >
> > > > > > > > >> > > > > > > >>  >> > If yes, it will read a
> > > ByteBufferMessageSet
> > > > > > from
> > > > > > > > >> > > > > > > topicPartition.log
> > > > > > > > >> > > > > > > >>  and
> > > > > > > > >> > > > > > > >>  >> append the message set to
> > > topicPartition.move
> > > > > > > > >> > > > > > > >>  >>
> > > > > > > > >> > > > > > > >>  >> i.e. processPartitionData will read
> data
> > > from
> > > > > the
> > > > > > > > >> > beginning
> > > > > > > > >> > > of
> > > > > > > > >> > > > > > > >>  >> topicPartition.log? What is the read
> > size?
> > > > > > > > >> > > > > > > >>  >> A ReplicaFetchThread reads many
> > partitions
> > > so
> > > > > if
> > > > > > > one
> > > > > > > > >> does
> > > > > > > > >> > > some
> > > > > > > > >> > > > > > > >>  complicated
> > > > > > > > >> > > > > > > >>  >> work (= read a lot of data from disk)
> > > > > everything
> > > > > > > will
> > > > > > > > >> slow
> > > > > > > > >> > > > down.
> > > > > > > > >> > > > > > > >>  >> I think read size should not be very
> big.
> > > > > > > > >> > > > > > > >>  >>
> > > > > > > > >> > > > > > > >>  >> On the other hand at this point
> > > > > > > > (processPartitionData)
> > > > > > > > >> one
> > > > > > > > >> > > can
> > > > > > > > >> > > > > use
> > > > > > > > >> > > > > > > only
> > > > > > > > >> > > > > > > >>  >> the new data (ByteBufferMessageSet from
> > > > > > parameters)
> > > > > > > > and
> > > > > > > > >> > wait
> > > > > > > > >> > > > > until
> > > > > > > > >> > > > > > > >>  >> (topicPartition.move.smallestOffset <=
> > > > > > > > >> > > > > > > topicPartition.log.smallestOff
> > > > > > > > >> > > > > > > >>  set
> > > > > > > > >> > > > > > > >>  >> && topicPartition.log.largestOffset ==
> > > > > > > > >> > > > > > > topicPartition.log.largestOffs
> > > > > > > > >> > > > > > > >>  et).
> > > > > > > > >> > > > > > > >>  >> In this case the write speed to
> > > > > > topicPartition.move
> > > > > > > > and
> > > > > > > > >> > > > > > > >>  topicPartition.log
> > > > > > > > >> > > > > > > >>  >> will be the same so this will allow us
> to
> > > > move
> > > > > > many
> > > > > > > > >> > > partitions
> > > > > > > > >> > > > > to
> > > > > > > > >> > > > > > > one
> > > > > > > > >> > > > > > > >>  disk.
> > > > > > > > >> > > > > > > >>  >
> > > > > > > > >> > > > > > > >>  > The read size of a given partition is
> > > > configured
> > > > > > > > >> > > > > > > >>  > using replica.fetch.max.bytes, which is
> > the
> > > > same
> > > > > > > size
> > > > > > > > >> used
> > > > > > > > >> > by
> > > > > > > > >> > > > > > > >>  FetchRequest
> > > > > > > > >> > > > > > > >>  > from follower to leader. If the broker
> is
> > > > > moving a
> > > > > > > > >> replica
> > > > > > > > >> > > for
> > > > > > > > >> > > > > > which
> > > > > > > > >> > > > > > > it
> > > > > > > > >> > > > > > > >>
> > > > > > > > >> > > > > > > >>  OK. Could you mention it in KIP?
> > > > > > > > >> > > > > > > >>
> > > > > > > > >> > > > > > > >>  > acts as a follower, the disk write rate
> > for
> > > > > moving
> > > > > > > > this
> > > > > > > > >> > > replica
> > > > > > > > >> > > > > is
> > > > > > > > >> > > > > > at
> > > > > > > > >> > > > > > > >>  most
> > > > > > > > >> > > > > > > >>  > the rate it fetches from leader (assume
> it
> > > is
> > > > > > > catching
> > > > > > > > >> up
> > > > > > > > >> > and
> > > > > > > > >> > > > has
> > > > > > > > >> > > > > > > >>  > sufficient data to read from leader,
> which
> > > is
> > > > > > > subject
> > > > > > > > to
> > > > > > > > >> > > > > > > round-trip-time
> > > > > > > > >> > > > > > > >>  > between itself and the leader. Thus this
> > > part
> > > > if
> > > > > > > > >> probably
> > > > > > > > >> > > fine
> > > > > > > > >> > > > > even
> > > > > > > > >> > > > > > > >>  without
> > > > > > > > >> > > > > > > >>  > quota.
> > > > > > > > >> > > > > > > >>
> > > > > > > > >> > > > > > > >>  I think there are 2 problems
> > > > > > > > >> > > > > > > >>  1. Without speed limiter this will not
> work
> > > good
> > > > > > even
> > > > > > > > for
> > > > > > > > >> 1
> > > > > > > > >> > > > > > partition.
> > > > > > > > >> > > > > > > In
> > > > > > > > >> > > > > > > >>  our production we had a problem so we did
> > the
> > > > > > throuput
> > > > > > > > >> > limiter:
> > > > > > > > >> > > > > > > >>  https://github.com/resetius/ka
> > > > > > > > >> fka/commit/cda31dadb2f135743bf
> > > > > > > > >> > > > > > > >>  41083062927886c5ddce1#diff-ffa
> > > > > > > > >> 8861e850121997a534ebdde2929c6R
> > > > > > > > >> > > 713
> > > > > > > > >> > > > > > > >>
> > > > > > > > >> > > > > > > >>  2. I dont understand how it will work in
> > case
> > > of
> > > > > big
> > > > > > > > >> > > > > > > >>  replica.fetch.wait.max.ms and partition
> > with
> > > > > > > irregular
> > > > > > > > >> flow.
> > > > > > > > >> > > > > > > >>  For example someone could have
> > > > > > > > replica.fetch.wait.max.ms
> > > > > > > > >> > =10mi
> > > > > > > > >> > > > nutes
> > > > > > > > >> > > > > > and
> > > > > > > > >> > > > > > > >>  partition that has very high data flow
> from
> > > > 12:00
> > > > > to
> > > > > > > > 13:00
> > > > > > > > >> > and
> > > > > > > > >> > > > zero
> > > > > > > > >> > > > > > > flow
> > > > > > > > >> > > > > > > >>  otherwise.
> > > > > > > > >> > > > > > > >>  In this case processPartitionData could be
> > > > called
> > > > > > once
> > > > > > > > per
> > > > > > > > >> > > > > 10minutes
> > > > > > > > >> > > > > > > so if
> > > > > > > > >> > > > > > > >>  we start data moving in 13:01 it will be
> > > > finished
> > > > > > next
> > > > > > > > >> day.
> > > > > > > > >> > > > > > > >>
> > > > > > > > >> > > > > > > >>  >
> > > > > > > > >> > > > > > > >>  > But ff the broker is moving a replica
> for
> > > > which
> > > > > it
> > > > > > > > acts
> > > > > > > > >> as
> > > > > > > > >> > a
> > > > > > > > >> > > > > > leader,
> > > > > > > > >> > > > > > > as
> > > > > > > > >> > > > > > > >>  of
> > > > > > > > >> > > > > > > >>  > current KIP the broker will keep reading
> > > from
> > > > > > > > >> log_dir_old
> > > > > > > > >> > and
> > > > > > > > >> > > > > > append
> > > > > > > > >> > > > > > > to
> > > > > > > > >> > > > > > > >>  > log_dir_new without having to wait for
> > > > > > > > round-trip-time.
> > > > > > > > >> We
> > > > > > > > >> > > > > probably
> > > > > > > > >> > > > > > > need
> > > > > > > > >> > > > > > > >>  > quota for this in the future.
> > > > > > > > >> > > > > > > >>  >
> > > > > > > > >> > > > > > > >>  >> >
> > > > > > > > >> > > > > > > >>  >> > And to answer your question, yes
> > > > > > > topicpartition.log
> > > > > > > > >> > refers
> > > > > > > > >> > > > to
> > > > > > > > >> > > > > > > >>  >> > topic-paritition/segment.log.
> > > > > > > > >> > > > > > > >>  >> >
> > > > > > > > >> > > > > > > >>  >> > Thanks,
> > > > > > > > >> > > > > > > >>  >> > Dong
> > > > > > > > >> > > > > > > >>  >> >
> > > > > > > > >> > > > > > > >>  >> > On Fri, Jan 13, 2017 at 4:12 AM,
> Alexey
> > > > > > > Ozeritsky <
> > > > > > > > >> > > > > > > >>  aozerit...@yandex.ru>
> > > > > > > > >> > > > > > > >>  >> > wrote:
> > > > > > > > >> > > > > > > >>  >> >
> > > > > > > > >> > > > > > > >>  >> >> Hi,
> > > > > > > > >> > > > > > > >>  >> >>
> > > > > > > > >> > > > > > > >>  >> >> We have the similar solution that
> have
> > > > been
> > > > > > > > working
> > > > > > > > >> in
> > > > > > > > >> > > > > > production
> > > > > > > > >> > > > > > > >>  since
> > > > > > > > >> > > > > > > >>  >> >> 2014. You can see it here:
> > > > > > > > >> > > https://github.com/resetius/ka
> > > > > > > > >> > > > > > > >>  >> >> fka/commit/20658593e246d218490
> > > > > > > > 6879defa2e763c4d413fb
> > > > > > > > >> > > > > > > >>  >> >> The idea is very simple
> > > > > > > > >> > > > > > > >>  >> >> 1. Disk balancer runs in a separate
> > > thread
> > > > > > > inside
> > > > > > > > >> > > scheduler
> > > > > > > > >> > > > > > pool.
> > > > > > > > >> > > > > > > >>  >> >> 2. It does not touch empty
> partitions
> > > > > > > > >> > > > > > > >>  >> >> 3. Before it moves a partition it
> > > forcibly
> > > > > > > creates
> > > > > > > > >> new
> > > > > > > > >> > > > > segment
> > > > > > > > >> > > > > > > on a
> > > > > > > > >> > > > > > > >>  >> >> destination disk
> > > > > > > > >> > > > > > > >>  >> >> 4. It moves segment by segment from
> > new
> > > to
> > > > > > old.
> > > > > > > > >> > > > > > > >>  >> >> 5. Log class works with segments on
> > both
> > > > > disks
> > > > > > > > >> > > > > > > >>  >> >>
> > > > > > > > >> > > > > > > >>  >> >> Your approach seems too complicated,
> > > > > moreover
> > > > > > it
> > > > > > > > >> means
> > > > > > > > >> > > that
> > > > > > > > >> > > > > you
> > > > > > > > >> > > > > > > >>  have to
> > > > > > > > >> > > > > > > >>  >> >> patch different components of the
> > system
> > > > > > > > >> > > > > > > >>  >> >> Could you clarify what do you mean
> by
> > > > > > > > >> > topicPartition.log?
> > > > > > > > >> > > > Is
> > > > > > > > >> > > > > it
> > > > > > > > >> > > > > > > >>  >> >> topic-paritition/segment.log ?
> > > > > > > > >> > > > > > > >>  >> >>
> > > > > > > > >> > > > > > > >>  >> >> 12.01.2017, 21:47, "Dong Lin" <
> > > > > > > > lindon...@gmail.com
> > > > > > > > >> >:
> > > > > > > > >> > > > > > > >>  >> >> > Hi all,
> > > > > > > > >> > > > > > > >>  >> >> >
> > > > > > > > >> > > > > > > >>  >> >> > We created KIP-113: Support
> replicas
> > > > > > movement
> > > > > > > > >> between
> > > > > > > > >> > > log
> > > > > > > > >> > > > > > > >>  >> directories.
> > > > > > > > >> > > > > > > >>  >> >> > Please find the KIP wiki in the
> link
> > > > > > > > >> > > > > > > >>  >> >> > *https://cwiki.apache.org/conf
> > > > > > > > >> > > > > luence/display/KAFKA/KIP-113%
> > > > > > > > >> > > > > > > >>  >> >> 3A+Support+replicas+movement+b
> > > > > > > > >> etween+log+directories
> > > > > > > > >> > > > > > > >>  >> >> > <https://cwiki.apache.org/conf
> > > > > > > > >> > > > > luence/display/KAFKA/KIP-113%
> > > > > > > > >> > > > > > > >>  >> >> 3A+Support+replicas+movement+
> > > > > > > > >> > between+log+directories>.*
> > > > > > > > >> > > > > > > >>  >> >> >
> > > > > > > > >> > > > > > > >>  >> >> > This KIP is related to KIP-112
> > > > > > > > >> > > > > > > >>  >> >> > <https://cwiki.apache.org/conf
> > > > > > > > >> > > > > luence/display/KAFKA/KIP-112%
> > > > > > > > >> > > > > > > >>  >> >> 3A+Handle+disk+failure+for+JBOD>:
> > > > > > > > >> > > > > > > >>  >> >> > Handle disk failure for JBOD. They
> > are
> > > > > > needed
> > > > > > > in
> > > > > > > > >> > order
> > > > > > > > >> > > to
> > > > > > > > >> > > > > > > support
> > > > > > > > >> > > > > > > >>  >> JBOD in
> > > > > > > > >> > > > > > > >>  >> >> > Kafka. Please help review the KIP.
> > You
> > > > > > > feedback
> > > > > > > > is
> > > > > > > > >> > > > > > appreciated!
> > > > > > > > >> > > > > > > >>  >> >> >
> > > > > > > > >> > > > > > > >>  >> >> > Thanks,
> > > > > > > > >> > > > > > > >>  >> >> > Dong
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > >
> > > > > > > > >> > > >
> > > > > > > > >> > >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to