Hi, Dong,

We are just comparing whether it's better for the reassignment tool to
send ChangeReplicaDirRequest
(1) before or (2) after writing the reassignment path in ZK .

In the case when all brokers are alive when the reassignment tool is run,
(1) guarantees 100% that the new replicas will be in the right log dirs and
(2) can't.

In the rarer case that some brokers go down immediately after the
reassignment tool is run, in either approach, there is a chance when the
failed broker comes back, it will complete the pending reassignment process
by putting some replicas in the wrong log dirs.

Implementation wise, (1) and (2) seem to be the same. So, it seems to me
that (1) is better?

Thanks,

Jun


On Thu, Mar 23, 2017 at 11:54 PM, Dong Lin <lindon...@gmail.com> wrote:

> Hey Jun,
>
> Thanks much for the response! I agree with you that if multiple replicas
> are created in the wrong directory, we may waste resource if either
> replicaMoveThread number is low or intra.broker.throttled.rate is slow.
> Then the question is whether the suggested approach increases the chance of
> replica being created in the correct log directory.
>
> I think the answer is no due to the argument provided in the previous
> email. Sending ChangeReplicaDirRequest before updating znode has negligible
> impact on the chance that the broker processes ChangeReplicaDirRequest
> before LeaderAndIsrRequest from controller. If we still worry about the
> order they are sent, the reassignment tool can first send
> ChangeReplicaDirRequest (so that broker remembers it in memory), create
> reassignment znode, and then retry ChangeReplicaDirRequset if the previous
> ChangeReplicaDirResponse says the replica has not been created. This should
> give us the highest possible chance of creating replica in the correct
> directory and avoid the problem of the suggested approach. I have updated
> "How
> to reassign replica between log directories across brokers" in the KIP to
> explain this procedure.
>
> To answer your question, the reassignment tool should fail with with proper
> error message if user has specified log directory for a replica on an
> offline broker.  This is reasonable because reassignment tool can not
> guarantee that the replica will be moved to the specified log directory if
> the broker is offline. If all brokers are online, the reassignment tool may
> hung up to 10 seconds (by default) to retry ChangeReplicaDirRequest if any
> replica has not been created already. User can change this timeout value
> using the newly-added --timeout argument of the reassignment tool. This is
> specified in the Public Interface section in the KIP. The reassignment tool
> will only block if user uses this new feature of reassigning replica to a
> specific log directory in the broker. Therefore it seems backward
> compatible.
>
> Does this address the concern?
>
> Thanks,
> Dong
>
> On Thu, Mar 23, 2017 at 10:06 PM, Jun Rao <j...@confluent.io> wrote:
>
> > Hi, Dong,
> >
> > 11.2 I think there are a few reasons why the cross disk movement may not
> > catch up if the replicas are created in the wrong log dirs to start with.
> > (a) There could be more replica fetcher threads than the disk movement
> > threads. (b) intra.broker.throttled.rate may be configured lower than the
> > replica throttle rate. That's why I think getting the replicas created in
> > the right log dirs will be better.
> >
> > For the corner case issue that you mentioned, I am not sure if the
> approach
> > in the KIP completely avoids that. If a broker is down when the partition
> > reassignment tool is started, does the tool just hang (keep retrying
> > ChangeReplicaDirRequest) until the broker comes back? Currently, the
> > partition reassignment tool doesn't block.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Tue, Mar 21, 2017 at 11:24 AM, Dong Lin <lindon...@gmail.com> wrote:
> >
> > > Hey Jun,
> > >
> > > Thanks for the explanation. Please see below my thoughts.
> > >
> > > 10. I see. So you are concerned with the potential implementation
> > > complexity which I wasn't aware of. I think it is OK not to do log
> > > cleaning on the .move log since there can be only one such log in each
> > > directory. I have updated the KIP to specify this:
> > >
> > > "The log segments in topicPartition.move directory will be subject to
> log
> > > truncation, log retention in the same way as the log segments in the
> > source
> > > log directory. But we may not do log cleaning on the
> topicPartition.move
> > to
> > > simplify the implementation."
> > >
> > > 11.2 Now I get your point. I think we have slightly different
> expectation
> > > of the order in which the reassignment tools updates reassignment node
> in
> > > ZK and sends ChangeReplicaDirRequest.
> > >
> > > I think the reassignment tool should first create reassignment znode
> and
> > > then keep sending ChangeReplicaDirRequest until success. I think
> sending
> > > ChangeReplicaDirRequest before updating znode has negligible impact on
> > the
> > > chance that the broker processes ChangeReplicaDirRequest before
> > > LeaderAndIsrRequest from controller, because the time for controller to
> > > receive ZK notification, handle state machine changes and send
> > > LeaderAndIsrRequests should be much longer than the time for
> reassignment
> > > tool to setup connection with broker and send ChangeReplicaDirRequest.
> > Even
> > > if broker receives LeaderAndIsrRequest a bit sooner, the data in the
> > > original replica should be smaller enough for .move log to catch up
> very
> > > quickly, so that broker can swap the log soon after it receives
> > > ChangeReplicaDirRequest -- otherwise the intra.broker.throttled.rate is
> > > probably too small. Does this address your concern with the
> performance?
> > >
> > > One concern with the suggested approach is that the
> > ChangeReplicaDirRequest
> > > may be lost if broker crashes before it creates the replica. I agree it
> > is
> > > rare. But it will be confusing when it happens. Operators would have to
> > > keep verifying reassignment and possibly retry execution until success
> if
> > > they want to make sure that the ChangeReplicaDirRequest is executed.
> > >
> > > Thanks,
> > > Dong
> > >
> > >
> > >
> > > On Tue, Mar 21, 2017 at 8:37 AM, Jun Rao <j...@confluent.io> wrote:
> > >
> > > > Hi, Dong,
> > > >
> > > > 10. I was mainly concerned about the additional complexity needed to
> > > > support log cleaning in the .move log. For example, LogToClean is
> keyed
> > > off
> > > > TopicPartition. To be able to support cleaning different instances of
> > the
> > > > same partition, we need additional logic. I am not how much
> additional
> > > > complexity is needed and whether it's worth it. If we don't do log
> > > cleaning
> > > > at all on the .move log, then we don't have to change the log
> cleaner's
> > > > code.
> > > >
> > > > 11.2 I was thinking of the following flow. In the execute phase, the
> > > > reassignment tool first issues a ChangeReplicaDirRequest to brokers
> > where
> > > > new replicas will be created. The brokers remember the mapping and
> > > return a
> > > > successful code. The reassignment tool then initiates the cross
> broker
> > > > movement through the controller. In the verify phase, in addition to
> > > > checking the replica assignment at the brokers, it issues
> > > > DescribeDirsRequest to check the replica to log dirs mapping. For
> each
> > > > partition in the response, the broker returns a state to indicate
> > whether
> > > > the replica is final, temporary or pending. If all replicas are in
> the
> > > > final state, the tool checks if all replicas are in the expected log
> > > dirs.
> > > > If they are not, output a warning (and perhaps suggest the users to
> > move
> > > > the data again). However, this should be rare.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Mon, Mar 20, 2017 at 10:46 AM, Dong Lin <lindon...@gmail.com>
> > wrote:
> > > >
> > > > > Hey Jun,
> > > > >
> > > > > Thanks for the response! It seems that we have only two remaining
> > > issues.
> > > > > Please see my reply below.
> > > > >
> > > > > On Mon, Mar 20, 2017 at 7:45 AM, Jun Rao <j...@confluent.io> wrote:
> > > > >
> > > > > > Hi, Dong,
> > > > > >
> > > > > > Thanks for the update. A few replies inlined below.
> > > > > >
> > > > > > On Thu, Mar 16, 2017 at 12:28 AM, Dong Lin <lindon...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > > > Hey Jun,
> > > > > > >
> > > > > > > Thanks for your comment! Please see my reply below.
> > > > > > >
> > > > > > > On Wed, Mar 15, 2017 at 9:45 PM, Jun Rao <j...@confluent.io>
> > wrote:
> > > > > > >
> > > > > > > > Hi, Dong,
> > > > > > > >
> > > > > > > > Thanks for the reply.
> > > > > > > >
> > > > > > > > 10. Could you comment on that?
> > > > > > > >
> > > > > > >
> > > > > > > Sorry, I missed that comment.
> > > > > > >
> > > > > > > Good point. I think the log segments in topicPartition.move
> > > directory
> > > > > > will
> > > > > > > be subject to log truncation, log retention and log cleaning in
> > the
> > > > > same
> > > > > > > way as the log segments in the source log directory. I just
> > > specified
> > > > > > this
> > > > > > > inthe KIP.
> > > > > > >
> > > > > > >
> > > > > > This is ok, but doubles the overhead of log cleaning. We probably
> > > want
> > > > to
> > > > > > think a bit more on this.
> > > > > >
> > > > >
> > > > > I think this is OK because the number of replicas that are being
> > moved
> > > is
> > > > > limited by the number of ReplicaMoveThread. The default number of
> > > > > ReplicaMoveThread is the number of log directories, which mean we
> > incur
> > > > > these overhead for at most one replica per log directory at any
> time.
> > > > > Suppose there are most than 100 replica in any log directory, the
> > > > increase
> > > > > in overhead is less than 1%.
> > > > >
> > > > > Another way to look at this is that this is no worse than replica
> > > > > reassignment. When we reassign replica from one broker to another,
> we
> > > > will
> > > > > double the overhread of log cleaning in the cluster for this
> replica.
> > > If
> > > > we
> > > > > are OK with this then we are OK with replica movement between log
> > > > > directories.
> > > > >
> > > > >
> > > > > >
> > > > > >
> > > > > > >
> > > > > > > >
> > > > > > > > 11.2 "I am concerned that the ChangeReplicaDirRequest would
> be
> > > lost
> > > > > if
> > > > > > > > broker
> > > > > > > > restarts after it sends ChangeReplicaDirResponse but before
> it
> > > > > receives
> > > > > > > > LeaderAndIsrRequest."
> > > > > > > >
> > > > > > > > In that case, the reassignment tool could detect that through
> > > > > > > > DescribeDirsRequest
> > > > > > > > and issue ChangeReplicaDirRequest again, right? In the common
> > > case,
> > > > > > this
> > > > > > > is
> > > > > > > > probably not needed and we only need to write each replica
> > once.
> > > > > > > >
> > > > > > > > My main concern with the approach in the current KIP is that
> > > once a
> > > > > new
> > > > > > > > replica is created in the wrong log dir, the cross log
> > directory
> > > > > > movement
> > > > > > > > may not catch up until the new replica is fully bootstrapped.
> > So,
> > > > we
> > > > > > end
> > > > > > > up
> > > > > > > > writing the data for the same replica twice.
> > > > > > > >
> > > > > > >
> > > > > > > I agree with your concern. My main concern is that it is a bit
> > > weird
> > > > if
> > > > > > > ChangeReplicaDirResponse can not guarantee success and the tool
> > > needs
> > > > > to
> > > > > > > rely on DescribeDirResponse to see if it needs to send
> > > > > > > ChangeReplicaDirRequest again.
> > > > > > >
> > > > > > > How about this: If broker doesn't not have already replica
> > created
> > > > for
> > > > > > the
> > > > > > > specified topicParition when it receives
> ChangeReplicaDirRequest,
> > > it
> > > > > will
> > > > > > > reply ReplicaNotAvailableException AND remember (replica,
> > > destination
> > > > > log
> > > > > > > directory) pair in memory to create the replica in the
> specified
> > > log
> > > > > > > directory.
> > > > > > >
> > > > > > >
> > > > > > I am not sure if returning ReplicaNotAvailableException is
> useful?
> > > What
> > > > > > will the client do on receiving ReplicaNotAvailableException in
> > this
> > > > > case?
> > > > > >
> > > > > > Perhaps we could just replace the is_temporary field in
> > > > > > DescribeDirsRresponsePartition with a state field. We can use 0
> to
> > > > > indicate
> > > > > > the partition is created, 1 to indicate the partition is
> temporary
> > > and
> > > > 2
> > > > > to
> > > > > > indicate that the partition is pending.
> > > > > >
> > > > >
> > > > > ReplicaNotAvailableException is useful because the client can
> re-send
> > > > > ChangeReplicaDirRequest (with backoff) after receiving
> > > > > ReplicaNotAvailableException in the response.
> ChangeReplicaDirRequest
> > > > will
> > > > > only succeed after replica has been created for the specified
> > partition
> > > > in
> > > > > the broker.
> > > > >
> > > > > I think this is cleaner than asking reassignment tool to detect
> that
> > > > > through DescribeDirsRequest and issue ChangeReplicaDirRequest
> again.
> > > Both
> > > > > solution has the same chance of writing the data for the same
> replica
> > > > > twice. In the original solution, the reassignment tool will keep
> > > retrying
> > > > > ChangeReplicaDirRequest until success. In the second suggested
> > > solution,
> > > > > the reassignment tool needs to send ChangeReplicaDirRequest, send
> > > > > DescribeDirsRequest to verify result, and retry
> > ChangeReplicaDirRequest
> > > > and
> > > > > DescribeDirsRequest again if the replica hasn't been created
> already.
> > > > Thus
> > > > > the second solution couples ChangeReplicaDirRequest with
> > > > > DescribeDirsRequest and makes tool's logic is bit more complicated.
> > > > >
> > > > > Besides, I am not sure I understand your suggestion for
> is_temporary
> > > > field.
> > > > > It seems that a replica can have only two states, i.e. normal it is
> > > being
> > > > > used to serve fetch/produce requests and temporary if it is a
> replica
> > > is
> > > > > that catching up with the normal one. If you think we should have
> > > > > reassignment tool send DescribeDirsRequest before retrying
> > > > > ChangeReplicaDirRequest, can you elaborate a bit what is the
> > "pending"
> > > > > state?
> > > > >
> > > > >
> > > > > >
> > > > > >
> > > > > > > >
> > > > > > > > 11.3 Are you saying the value in --throttle will be used to
> set
> > > > both
> > > > > > > > intra.broker.throttled.rate and leader.follower.replication.
> > > > > > > > throttled.replicas?
> > > > > > > >
> > > > > > >
> > > > > > > No. --throttle will be used to only to set
> > > > leader.follower.replication
> > > > > as
> > > > > > > it does now. I think we do not need any option in the
> > > > > > > kafka-reassignment-partitions.sh to specify
> > > > > intra.broker.throttled.rate.
> > > > > > > User canset it in broker config or dynamically using
> > > kafka-config.sh.
> > > > > > Does
> > > > > > > this sound OK?
> > > > > > >
> > > > > > >
> > > > > > Ok. This sounds good. It would be useful to make this clear in
> the
> > > > wiki.
> > > > > >
> > > > > > Sure. I have updated the wiki to specify this: "the quota
> specified
> > > by
> > > > > the
> > > > > argument `–throttle` will be applied to only inter-broker replica
> > > > > reassignment. It does not affect the quota for replica movement
> > between
> > > > log
> > > > > directories".
> > > > >
> > > > >
> > > > > > >
> > > > > > > >
> > > > > > > > 12.2 If the user only wants to check one topic, the tool
> could
> > do
> > > > the
> > > > > > > > filtering on the client side, right? My concern with having
> > both
> > > > > > log_dirs
> > > > > > > > and topics is the semantic. For example, if both are not
> empty,
> > > do
> > > > we
> > > > > > > > return the intersection or the union?
> > > > > > > >
> > > > > > >
> > > > > > > Yes the tool could filter on the client side. But the purpose
> of
> > > > having
> > > > > > > this field is to reduce response side in case broker has a lot
> of
> > > > > topics.
> > > > > > > The both fields are used as filter and the result is
> > intersection.
> > > Do
> > > > > you
> > > > > > > think this semantic is confusing or counter-intuitive?
> > > > > >
> > > > > >
> > > > > > >
> > > > > >
> > > > > > Ok. Could we document the semantic when both dirs and topics are
> > > > > specified?
> > > > > >
> > > > >
> > > > > Sure. I have updated the wiki to specify this: "log_dirs and topics
> > are
> > > > > used to filter the results to include only the specified
> > log_dir/topic.
> > > > The
> > > > > result is the intersection of both filters".
> > > > >
> > > > >
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > > >
> > > > > > > >
> > > > > > > > On Mon, Mar 13, 2017 at 3:32 PM, Dong Lin <
> lindon...@gmail.com
> > >
> > > > > wrote:
> > > > > > > >
> > > > > > > > > Hey Jun,
> > > > > > > > >
> > > > > > > > > Thanks much for your detailed comments. Please see my reply
> > > > below.
> > > > > > > > >
> > > > > > > > > On Mon, Mar 13, 2017 at 9:09 AM, Jun Rao <j...@confluent.io
> >
> > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi, Dong,
> > > > > > > > > >
> > > > > > > > > > Thanks for the updated KIP. Some more comments below.
> > > > > > > > > >
> > > > > > > > > > 10. For the .move log, do we perform any segment deletion
> > > > (based
> > > > > on
> > > > > > > > > > retention) or log cleaning (if a compacted topic)? Or do
> we
> > > > only
> > > > > > > enable
> > > > > > > > > > that after the swap?
> > > > > > > > > >
> > > > > > > > > > 11. kafka-reassign-partitions.sh
> > > > > > > > > > 11.1 If all reassigned replicas are in the current broker
> > and
> > > > > only
> > > > > > > the
> > > > > > > > > log
> > > > > > > > > > directories have changed, we can probably optimize the
> tool
> > > to
> > > > > not
> > > > > > > > > trigger
> > > > > > > > > > partition reassignment through the controller and only
> > > > > > > > > > send ChangeReplicaDirRequest.
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > > Yes, the reassignment script should not create the
> > reassignment
> > > > > znode
> > > > > > > if
> > > > > > > > no
> > > > > > > > > replicas are not be moved between brokers. This falls into
> > the
> > > > "How
> > > > > > to
> > > > > > > > move
> > > > > > > > > replica between log directories on the same broker" of the
> > > > Proposed
> > > > > > > > Change
> > > > > > > > > section.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > > 11.2 If ChangeReplicaDirRequest specifies a replica
> that's
> > > not
> > > > > > > created
> > > > > > > > > yet,
> > > > > > > > > > could the broker just remember that in memory and create
> > the
> > > > > > replica
> > > > > > > > when
> > > > > > > > > > the creation is requested? This way, when doing cluster
> > > > > expansion,
> > > > > > we
> > > > > > > > can
> > > > > > > > > > make sure that the new replicas on the new brokers are
> > > created
> > > > in
> > > > > > the
> > > > > > > > > right
> > > > > > > > > > log directory in the first place. We can also avoid the
> > tool
> > > > > having
> > > > > > > to
> > > > > > > > > keep
> > > > > > > > > > issuing ChangeReplicaDirRequest in response to
> > > > > > > > > > ReplicaNotAvailableException.
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > > I am concerned that the ChangeReplicaDirRequest would be
> lost
> > > if
> > > > > > broker
> > > > > > > > > restarts after it sends ChangeReplicaDirResponse but before
> > it
> > > > > > receives
> > > > > > > > > LeaderAndIsrRequest. In this case, the user will receive
> > > success
> > > > > when
> > > > > > > > they
> > > > > > > > > initiate replica reassignment, but replica reassignment
> will
> > > > never
> > > > > > > > complete
> > > > > > > > > when they verify the reassignment later. This would be
> > > confusing
> > > > to
> > > > > > > user.
> > > > > > > > >
> > > > > > > > > There are three different approaches to this problem if
> > broker
> > > > has
> > > > > > not
> > > > > > > > > created replica yet after it receives
> > ChangeReplicaDirResquest:
> > > > > > > > >
> > > > > > > > > 1) Broker immediately replies to user with
> > > > > > ReplicaNotAvailableException
> > > > > > > > and
> > > > > > > > > user can decide to retry again later. The advantage of this
> > > > > solution
> > > > > > is
> > > > > > > > > that the broker logic is very simple and the reassignment
> > > script
> > > > > > logic
> > > > > > > > also
> > > > > > > > > seems straightforward. The disadvantage is that user script
> > has
> > > > to
> > > > > > > retry.
> > > > > > > > > But it seems fine - we can set interval between retries to
> be
> > > 0.5
> > > > > sec
> > > > > > > so
> > > > > > > > > that broker want be bombarded by those requests. This is
> the
> > > > > solution
> > > > > > > > > chosen in the current KIP.
> > > > > > > > >
> > > > > > > > > 2) Broker can put ChangeReplicaDirRequest in a purgatory
> with
> > > > > timeout
> > > > > > > and
> > > > > > > > > replies to user after the replica has been created. I
> didn't
> > > > choose
> > > > > > > this
> > > > > > > > in
> > > > > > > > > the interest of keeping broker logic simpler.
> > > > > > > > >
> > > > > > > > > 3) Broker can remember that by making a mark in the disk,
> > e.g.
> > > > > create
> > > > > > > > > topicPartition.tomove directory in the destination log
> > > directory.
> > > > > > This
> > > > > > > > mark
> > > > > > > > > will be persisted across broker restart. This is the first
> > > idea I
> > > > > had
> > > > > > > > but I
> > > > > > > > > replaced it with solution 1) in the interest of keeping
> > broker
> > > > > > simple.
> > > > > > > > >
> > > > > > > > > It seems that solution 1) is the simplest one that works.
> > But I
> > > > am
> > > > > OK
> > > > > > > to
> > > > > > > > > switch to the other two solutions if we don't want the
> retry
> > > > logic.
> > > > > > > What
> > > > > > > > do
> > > > > > > > > you think?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > 11.3 Do we need an option in the tool to specify
> > intra.broker.
> > > > > > > > > > throttled.rate?
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > > I don't find it useful to add this option to
> > > > > > > > kafka-reassign-partitions.sh.
> > > > > > > > > The reason we have the option "--throttle" in the script to
> > > > > throttle
> > > > > > > > > replication rate is that we usually want higher quota to
> fix
> > an
> > > > > > offline
> > > > > > > > > replica to get out of URP. But we are OK to have a lower
> > quota
> > > if
> > > > > we
> > > > > > > are
> > > > > > > > > moving replica only to balance the cluster. Thus it is
> common
> > > for
> > > > > SRE
> > > > > > > to
> > > > > > > > > use different quota when using kafka-reassign-partitions.sh
> > to
> > > > move
> > > > > > > > replica
> > > > > > > > > between brokers.
> > > > > > > > >
> > > > > > > > > However, the only reason for moving replica between log
> > > > directories
> > > > > > of
> > > > > > > > the
> > > > > > > > > same broker is to balance cluster resource. Thus the option
> > to
> > > > > > > > > specify intra.broker.throttled.rate in the tool is not that
> > > > > useful. I
> > > > > > > am
> > > > > > > > > inclined not to add this option to keep this tool's usage
> > > > simpler.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > 12. DescribeDirsRequest
> > > > > > > > > > 12.1 In other requests like CreateTopicRequest, we return
> > an
> > > > > empty
> > > > > > > list
> > > > > > > > > in
> > > > > > > > > > the response for an empty input list. If the input list
> is
> > > > null,
> > > > > we
> > > > > > > > > return
> > > > > > > > > > everything. We should probably follow the same convention
> > > here.
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > > Thanks. I wasn't aware of this convention. I have change
> > > > > > > > > DescribeDirsRequest so that "null" indicates "all".
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > > 12.2 Do we need the topics field? Since the request is
> > about
> > > > log
> > > > > > > dirs,
> > > > > > > > it
> > > > > > > > > > makes sense to specify the log dirs. But it's weird to
> > > specify
> > > > > > > topics.
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > > The topics field is not necessary. But it is useful to
> reduce
> > > the
> > > > > > > > response
> > > > > > > > > size in case user are only interested in the status of a
> few
> > > > > topics.
> > > > > > > For
> > > > > > > > > example, user may have initiated the reassignment of a
> given
> > > > > replica
> > > > > > > from
> > > > > > > > > one log directory to another log directory on the same
> > broker,
> > > > and
> > > > > > the
> > > > > > > > user
> > > > > > > > > only wants to check the status of this given partition by
> > > looking
> > > > > > > > > at DescribeDirsResponse. Thus this field is useful.
> > > > > > > > >
> > > > > > > > > I am not sure if it is weird to call this request
> > > > > > DescribeDirsRequest.
> > > > > > > > The
> > > > > > > > > response is a map from log directory to information to some
> > > > > > partitions
> > > > > > > on
> > > > > > > > > the log directory. Do you think we need to change the name
> of
> > > the
> > > > > > > > request?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > > 12.3 DescribeDirsResponsePartition: Should we include
> > > > firstOffset
> > > > > > and
> > > > > > > > > > nextOffset in the response? That could be useful to track
> > the
> > > > > > > progress
> > > > > > > > of
> > > > > > > > > > the movement.
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > > Yeah good point. I agree it is useful to include
> logEndOffset
> > > in
> > > > > the
> > > > > > > > > response. According to Log.scala doc the logEndOffset is
> > > > equivalent
> > > > > > to
> > > > > > > > the
> > > > > > > > > nextOffset. User can track progress by checking the
> > difference
> > > > > > between
> > > > > > > > > logEndOffset of the given partition in the source and
> > > destination
> > > > > log
> > > > > > > > > directories. I have added logEndOffset to the
> > > > > > > > DescribeDirsResponsePartition
> > > > > > > > > in the KIP.
> > > > > > > > >
> > > > > > > > > But it seems that we don't need firstOffset in the
> response.
> > Do
> > > > you
> > > > > > > think
> > > > > > > > > firstOffset is still needed?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > 13. ChangeReplicaDirResponse: Do we need error code at
> both
> > > > > levels?
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > > My bad. It is not needed. I have removed request level
> error
> > > > code.
> > > > > I
> > > > > > > also
> > > > > > > > > added ChangeReplicaDirRequestTopic and
> > > > > ChangeReplicaDirResponseTopic
> > > > > > to
> > > > > > > > > reduce duplication of the "topic" string in the request and
> > > > > response.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > 14. num.replica.move.threads: Does it default to # log
> > dirs?
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > > No. It doesn't. I expect default number to be set to a
> > > > conservative
> > > > > > > value
> > > > > > > > > such as 3. It may be surprising to user if the number of
> > > threads
> > > > > > > increase
> > > > > > > > > just because they have assigned more log directories to
> Kafka
> > > > > broker.
> > > > > > > > >
> > > > > > > > > It seems that the number of replica move threads doesn't
> have
> > > to
> > > > > > depend
> > > > > > > > on
> > > > > > > > > the number of log directories. It is possible to have one
> > > thread
> > > > > that
> > > > > > > > moves
> > > > > > > > > replicas across all log directories. On the other hand we
> can
> > > > have
> > > > > > > > multiple
> > > > > > > > > threads to move replicas to the same log directory. For
> > > example,
> > > > if
> > > > > > > > broker
> > > > > > > > > uses SSD, the CPU instead of disk IO may be the replica
> move
> > > > > > bottleneck
> > > > > > > > and
> > > > > > > > > it will be faster to move replicas using multiple threads
> per
> > > log
> > > > > > > > > directory.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > >
> > > > > > > > > > Jun
> > > > > > > > > >
> > > > > > > > > > On Thu, Mar 9, 2017 at 7:04 PM, Dong Lin <
> > > lindon...@gmail.com>
> > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > I just made one correction in the KIP. If broker
> receives
> > > > > > > > > > > ChangeReplicaDirRequest and the replica hasn't been
> > created
> > > > > > there,
> > > > > > > > the
> > > > > > > > > > > broker will respond ReplicaNotAvailableException.
> > > > > > > > > > > The kafka-reassignemnt-partitions.sh will need to
> > re-send
> > > > > > > > > > > ChangeReplicaDirRequest in this case in order to wait
> for
> > > > > > > controller
> > > > > > > > to
> > > > > > > > > > > send LeaderAndIsrRequest to broker. The previous
> approach
> > > of
> > > > > > > creating
> > > > > > > > > an
> > > > > > > > > > > empty directory seems hacky.
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Thu, Mar 9, 2017 at 6:33 PM, Dong Lin <
> > > > lindon...@gmail.com>
> > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hey Jun,
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks for your comments! I have updated the KIP to
> > > address
> > > > > > your
> > > > > > > > > > > comments.
> > > > > > > > > > > > Please see my reply inline.
> > > > > > > > > > > >
> > > > > > > > > > > > Can you let me know if the latest KIP has addressed
> > your
> > > > > > > comments?
> > > > > > > > > > > >
> > > > > > > > > > > > On Wed, Mar 8, 2017 at 9:56 PM, Jun Rao <
> > > j...@confluent.io>
> > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > >> Hi, Dong,
> > > > > > > > > > > >>
> > > > > > > > > > > >> Thanks for the reply.
> > > > > > > > > > > >>
> > > > > > > > > > > >> 1.3 So the thread gets the lock, checks if caught up
> > and
> > > > > > > releases
> > > > > > > > > the
> > > > > > > > > > > lock
> > > > > > > > > > > >> if not? Then, in the case when there is continuous
> > > > incoming
> > > > > > > data,
> > > > > > > > > the
> > > > > > > > > > > >> thread may never get a chance to swap. One way to
> > > address
> > > > > this
> > > > > > > is
> > > > > > > > > when
> > > > > > > > > > > the
> > > > > > > > > > > >> thread is getting really close in catching up, just
> > hold
> > > > > onto
> > > > > > > the
> > > > > > > > > lock
> > > > > > > > > > > >> until the thread fully catches up.
> > > > > > > > > > > >>
> > > > > > > > > > > >
> > > > > > > > > > > > Yes, that was my original solution. I see your point
> > that
> > > > the
> > > > > > > lock
> > > > > > > > > may
> > > > > > > > > > > not
> > > > > > > > > > > > be fairly assigned to ReplicaMoveThread and
> > > > > > RequestHandlerThread
> > > > > > > > when
> > > > > > > > > > > there
> > > > > > > > > > > > is frequent incoming requets. You solution should
> > address
> > > > the
> > > > > > > > problem
> > > > > > > > > > > and I
> > > > > > > > > > > > have updated the KIP to use it.
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >>
> > > > > > > > > > > >> 2.3 So, you are saying that the partition
> reassignment
> > > > tool
> > > > > > can
> > > > > > > > > first
> > > > > > > > > > > send
> > > > > > > > > > > >> a ChangeReplicaDirRequest to relevant brokers to
> > > establish
> > > > > the
> > > > > > > log
> > > > > > > > > dir
> > > > > > > > > > > for
> > > > > > > > > > > >> replicas not created yet, then trigger the partition
> > > > > movement
> > > > > > > > across
> > > > > > > > > > > >> brokers through the controller? That's actually a
> good
> > > > idea.
> > > > > > > Then,
> > > > > > > > > we
> > > > > > > > > > > can
> > > > > > > > > > > >> just leave LeaderAndIsrRequest as it is.
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > Yes, that is what I plan to do. If broker receives a
> > > > > > > > > > > > ChangeReplicaDirRequest while it is not leader or
> > > follower
> > > > of
> > > > > > the
> > > > > > > > > > > > partition, the broker will create an empty Log
> instance
> > > > > (i.e. a
> > > > > > > > > > directory
> > > > > > > > > > > > named topicPartition) in the destination log
> directory
> > so
> > > > > that
> > > > > > > the
> > > > > > > > > > > replica
> > > > > > > > > > > > will be placed there when broker receives
> > > > LeaderAndIsrRequest
> > > > > > > from
> > > > > > > > > the
> > > > > > > > > > > > broker. The broker should clean up empty those Log
> > > > instances
> > > > > on
> > > > > > > > > startup
> > > > > > > > > > > > just in case a ChangeReplicaDirRequest was mistakenly
> > > sent
> > > > > to a
> > > > > > > > > broker
> > > > > > > > > > > that
> > > > > > > > > > > > was not meant to be follower/leader of the
> partition..
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >> Another thing related to
> > > > > > > > > > > >> ChangeReplicaDirRequest.
> > > > > > > > > > > >> Since this request may take long to complete, I am
> not
> > > > sure
> > > > > if
> > > > > > > we
> > > > > > > > > > should
> > > > > > > > > > > >> wait for the movement to complete before respond.
> > While
> > > > > > waiting
> > > > > > > > for
> > > > > > > > > > the
> > > > > > > > > > > >> movement to complete, the idle connection may be
> > killed
> > > or
> > > > > the
> > > > > > > > > client
> > > > > > > > > > > may
> > > > > > > > > > > >> be gone already. An alternative is to return
> > immediately
> > > > and
> > > > > > > add a
> > > > > > > > > new
> > > > > > > > > > > >> request like CheckReplicaDirRequest to see if the
> > > movement
> > > > > has
> > > > > > > > > > > completed.
> > > > > > > > > > > >> The tool can take advantage of that to check the
> > status.
> > > > > > > > > > > >>
> > > > > > > > > > > >
> > > > > > > > > > > > I agree with your concern and solution. We need
> request
> > > to
> > > > > > query
> > > > > > > > the
> > > > > > > > > > > > partition -> log_directory mapping on the broker. I
> > have
> > > > > > updated
> > > > > > > > the
> > > > > > > > > > KIP
> > > > > > > > > > > to
> > > > > > > > > > > > remove need for ChangeReplicaDirRequestPurgatory.
> > > > > > > > > > > > Instead, kafka-reassignemnt-partitions.sh will send
> > > > > > > > > > DescribeDirsRequest
> > > > > > > > > > > > to brokers when user wants to verify the partition
> > > > > assignment.
> > > > > > > > Since
> > > > > > > > > we
> > > > > > > > > > > > need this DescribeDirsRequest anyway, we can also use
> > > this
> > > > > > > request
> > > > > > > > to
> > > > > > > > > > > > expose stats like the individual log size instead of
> > > using
> > > > > JMX.
> > > > > > > One
> > > > > > > > > > > > drawback of using JMX is that user has to manage the
> > JMX
> > > > port
> > > > > > and
> > > > > > > > > > related
> > > > > > > > > > > > credentials if they haven't already done this, which
> is
> > > the
> > > > > > case
> > > > > > > at
> > > > > > > > > > > > LinkedIn.
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >> Thanks,
> > > > > > > > > > > >>
> > > > > > > > > > > >> Jun
> > > > > > > > > > > >>
> > > > > > > > > > > >>
> > > > > > > > > > > >>
> > > > > > > > > > > >> On Wed, Mar 8, 2017 at 6:21 PM, Dong Lin <
> > > > > lindon...@gmail.com
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > > > >>
> > > > > > > > > > > >> > Hey Jun,
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Thanks for the detailed explanation. I will use
> the
> > > > > separate
> > > > > > > > > thread
> > > > > > > > > > > >> pool to
> > > > > > > > > > > >> > move replica between log directories. I will let
> you
> > > > know
> > > > > > when
> > > > > > > > the
> > > > > > > > > > KIP
> > > > > > > > > > > >> has
> > > > > > > > > > > >> > been updated to use a separate thread pool.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Here is my response to your other questions:
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > 1.3 My idea is that the ReplicaMoveThread that
> moves
> > > > data
> > > > > > > should
> > > > > > > > > get
> > > > > > > > > > > the
> > > > > > > > > > > >> > lock before checking whether the replica in the
> > > > > destination
> > > > > > > log
> > > > > > > > > > > >> directory
> > > > > > > > > > > >> > has caught up. If the new replica has caught up,
> > then
> > > > the
> > > > > > > > > > > >> ReplicaMoveThread
> > > > > > > > > > > >> > should swaps the replica while it is still holding
> > the
> > > > > lock.
> > > > > > > The
> > > > > > > > > > > >> > ReplicaFetcherThread or RequestHandlerThread will
> > not
> > > be
> > > > > > able
> > > > > > > to
> > > > > > > > > > > append
> > > > > > > > > > > >> > data to the replica in the source replica during
> > this
> > > > > period
> > > > > > > > > because
> > > > > > > > > > > >> they
> > > > > > > > > > > >> > can not get the lock. Does this address the
> problem?
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > 2.3 I get your point that we want to keep
> controller
> > > > > > simpler.
> > > > > > > If
> > > > > > > > > > admin
> > > > > > > > > > > >> tool
> > > > > > > > > > > >> > can send ChangeReplicaDirRequest to move data
> > within a
> > > > > > broker,
> > > > > > > > > then
> > > > > > > > > > > >> > controller probably doesn't even need to include
> log
> > > > > > directory
> > > > > > > > > path
> > > > > > > > > > in
> > > > > > > > > > > >> the
> > > > > > > > > > > >> > LeaderAndIsrRequest. How about this: controller
> will
> > > > only
> > > > > > deal
> > > > > > > > > with
> > > > > > > > > > > >> > reassignment across brokers as it does now. If
> user
> > > > > > specified
> > > > > > > > > > > >> destination
> > > > > > > > > > > >> > replica for any disk, the admin tool will send
> > > > > > > > > > ChangeReplicaDirRequest
> > > > > > > > > > > >> and
> > > > > > > > > > > >> > wait for response from broker to confirm that all
> > > > replicas
> > > > > > > have
> > > > > > > > > been
> > > > > > > > > > > >> moved
> > > > > > > > > > > >> > to the destination log direcotry. The broker will
> > put
> > > > > > > > > > > >> > ChangeReplicaDirRequset in a purgatory and respond
> > > > either
> > > > > > when
> > > > > > > > the
> > > > > > > > > > > >> movement
> > > > > > > > > > > >> > is completed or when the request has timed-out.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > 4. I agree that we can expose these metrics via
> JMX.
> > > > But I
> > > > > > am
> > > > > > > > not
> > > > > > > > > > sure
> > > > > > > > > > > >> if
> > > > > > > > > > > >> > it can be obtained easily with good performance
> > using
> > > > > either
> > > > > > > > > > existing
> > > > > > > > > > > >> tools
> > > > > > > > > > > >> > or new script in kafka. I will ask SREs for their
> > > > opinion.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Thanks,
> > > > > > > > > > > >> > Dong
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > On Wed, Mar 8, 2017 at 1:24 PM, Jun Rao <
> > > > j...@confluent.io
> > > > > >
> > > > > > > > wrote:
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > > Hi, Dong,
> > > > > > > > > > > >> > >
> > > > > > > > > > > >> > > Thanks for the updated KIP. A few more comments
> > > below.
> > > > > > > > > > > >> > >
> > > > > > > > > > > >> > > 1.1 and 1.2: I am still not sure there is enough
> > > > benefit
> > > > > > of
> > > > > > > > > > reusing
> > > > > > > > > > > >> > > ReplicaFetchThread
> > > > > > > > > > > >> > > to move data across disks.
> > > > > > > > > > > >> > > (a) A big part of ReplicaFetchThread is to deal
> > with
> > > > > > issuing
> > > > > > > > and
> > > > > > > > > > > >> tracking
> > > > > > > > > > > >> > > fetch requests. So, it doesn't feel that we get
> > much
> > > > > from
> > > > > > > > > reusing
> > > > > > > > > > > >> > > ReplicaFetchThread
> > > > > > > > > > > >> > > only to disable the fetching part.
> > > > > > > > > > > >> > > (b) The leader replica has no ReplicaFetchThread
> > to
> > > > > start
> > > > > > > > with.
> > > > > > > > > It
> > > > > > > > > > > >> feels
> > > > > > > > > > > >> > > weird to start one just for intra broker data
> > > > movement.
> > > > > > > > > > > >> > > (c) The ReplicaFetchThread is per broker.
> > > Intuitively,
> > > > > the
> > > > > > > > > number
> > > > > > > > > > of
> > > > > > > > > > > >> > > threads doing intra broker data movement should
> be
> > > > > related
> > > > > > > to
> > > > > > > > > the
> > > > > > > > > > > >> number
> > > > > > > > > > > >> > of
> > > > > > > > > > > >> > > disks in the broker, not the number of brokers
> in
> > > the
> > > > > > > cluster.
> > > > > > > > > > > >> > > (d) If the destination disk fails, we want to
> stop
> > > the
> > > > > > intra
> > > > > > > > > > broker
> > > > > > > > > > > >> data
> > > > > > > > > > > >> > > movement, but want to continue inter broker
> > > > replication.
> > > > > > So,
> > > > > > > > > > > >> logically,
> > > > > > > > > > > >> > it
> > > > > > > > > > > >> > > seems it's better to separate out the two.
> > > > > > > > > > > >> > > (e) I am also not sure if we should reuse the
> > > existing
> > > > > > > > > throttling
> > > > > > > > > > > for
> > > > > > > > > > > >> > > replication. It's designed to handle traffic
> > across
> > > > > > brokers
> > > > > > > > and
> > > > > > > > > > the
> > > > > > > > > > > >> > > delaying is done in the fetch request. So, if we
> > are
> > > > not
> > > > > > > doing
> > > > > > > > > > > >> > > fetching in ReplicaFetchThread,
> > > > > > > > > > > >> > > I am not sure the existing throttling is
> > effective.
> > > > > Also,
> > > > > > > when
> > > > > > > > > > > >> specifying
> > > > > > > > > > > >> > > the throttling of moving data across disks, it
> > seems
> > > > the
> > > > > > > user
> > > > > > > > > > > >> shouldn't
> > > > > > > > > > > >> > > care about whether a replica is a leader or a
> > > > follower.
> > > > > > > > Reusing
> > > > > > > > > > the
> > > > > > > > > > > >> > > existing throttling config name will be awkward
> in
> > > > this
> > > > > > > > regard.
> > > > > > > > > > > >> > > (f) It seems it's simpler and more consistent to
> > > use a
> > > > > > > > separate
> > > > > > > > > > > thread
> > > > > > > > > > > >> > pool
> > > > > > > > > > > >> > > for local data movement (for both leader and
> > > follower
> > > > > > > > replicas).
> > > > > > > > > > > This
> > > > > > > > > > > >> > > process can then be configured (e.g. number of
> > > > threads,
> > > > > > etc)
> > > > > > > > and
> > > > > > > > > > > >> > throttled
> > > > > > > > > > > >> > > independently.
> > > > > > > > > > > >> > >
> > > > > > > > > > > >> > > 1.3 Yes, we will need some synchronization
> there.
> > > So,
> > > > if
> > > > > > the
> > > > > > > > > > > movement
> > > > > > > > > > > >> > > thread catches up, gets the lock to do the swap,
> > but
> > > > > > > realizes
> > > > > > > > > that
> > > > > > > > > > > new
> > > > > > > > > > > >> > data
> > > > > > > > > > > >> > > is added, it has to continue catching up while
> > > holding
> > > > > the
> > > > > > > > lock?
> > > > > > > > > > > >> > >
> > > > > > > > > > > >> > > 2.3 The benefit of including the desired log
> > > directory
> > > > > in
> > > > > > > > > > > >> > > LeaderAndIsrRequest
> > > > > > > > > > > >> > > during partition reassignment is that the
> > controller
> > > > > > doesn't
> > > > > > > > > need
> > > > > > > > > > to
> > > > > > > > > > > >> > track
> > > > > > > > > > > >> > > the progress for disk movement. So, you don't
> need
> > > the
> > > > > > > > > additional
> > > > > > > > > > > >> > > BrokerDirStateUpdateRequest. Then the controller
> > > never
> > > > > > needs
> > > > > > > > to
> > > > > > > > > > > issue
> > > > > > > > > > > >> > > ChangeReplicaDirRequest.
> > > > > > > > > > > >> > > Only the admin tool will issue
> > > ChangeReplicaDirRequest
> > > > > to
> > > > > > > move
> > > > > > > > > > data
> > > > > > > > > > > >> > within
> > > > > > > > > > > >> > > a broker. I agree that this makes
> > > LeaderAndIsrRequest
> > > > > more
> > > > > > > > > > > >> complicated,
> > > > > > > > > > > >> > but
> > > > > > > > > > > >> > > that seems simpler than changing the controller
> to
> > > > track
> > > > > > > > > > additional
> > > > > > > > > > > >> > states
> > > > > > > > > > > >> > > during partition reassignment.
> > > > > > > > > > > >> > >
> > > > > > > > > > > >> > > 4. We want to make a decision on how to expose
> the
> > > > > stats.
> > > > > > So
> > > > > > > > > far,
> > > > > > > > > > we
> > > > > > > > > > > >> are
> > > > > > > > > > > >> > > exposing stats like the individual log size as
> > JMX.
> > > > So,
> > > > > > one
> > > > > > > > way
> > > > > > > > > is
> > > > > > > > > > > to
> > > > > > > > > > > >> > just
> > > > > > > > > > > >> > > add new jmx to expose the log directory of
> > > individual
> > > > > > > > replicas.
> > > > > > > > > > > >> > >
> > > > > > > > > > > >> > > Thanks,
> > > > > > > > > > > >> > >
> > > > > > > > > > > >> > > Jun
> > > > > > > > > > > >> > >
> > > > > > > > > > > >> > >
> > > > > > > > > > > >> > > On Thu, Mar 2, 2017 at 11:18 PM, Dong Lin <
> > > > > > > > lindon...@gmail.com>
> > > > > > > > > > > >> wrote:
> > > > > > > > > > > >> > >
> > > > > > > > > > > >> > > > Hey Jun,
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > > Thanks for all the comments! Please see my
> > answer
> > > > > > below. I
> > > > > > > > > have
> > > > > > > > > > > >> updated
> > > > > > > > > > > >> > > the
> > > > > > > > > > > >> > > > KIP to address most of the questions and make
> > the
> > > > KIP
> > > > > > > easier
> > > > > > > > > to
> > > > > > > > > > > >> > > understand.
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > > Thanks,
> > > > > > > > > > > >> > > > Dong
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > > On Thu, Mar 2, 2017 at 9:35 AM, Jun Rao <
> > > > > > j...@confluent.io
> > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > > > Hi, Dong,
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > > Thanks for the KIP. A few comments below.
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > > 1. For moving data across directories
> > > > > > > > > > > >> > > > > 1.1 I am not sure why we want to use
> > > > > > > ReplicaFetcherThread
> > > > > > > > to
> > > > > > > > > > > move
> > > > > > > > > > > >> > data
> > > > > > > > > > > >> > > > > around in the leader. ReplicaFetchThread
> > fetches
> > > > > data
> > > > > > > from
> > > > > > > > > > > socket.
> > > > > > > > > > > >> > For
> > > > > > > > > > > >> > > > > moving data locally, it seems that we want
> to
> > > > avoid
> > > > > > the
> > > > > > > > > socket
> > > > > > > > > > > >> > > overhead.
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > > The purpose of using ReplicaFetchThread is to
> > > re-use
> > > > > > > > existing
> > > > > > > > > > > thread
> > > > > > > > > > > >> > > > instead of creating more threads and make our
> > > thread
> > > > > > model
> > > > > > > > > more
> > > > > > > > > > > >> > complex.
> > > > > > > > > > > >> > > It
> > > > > > > > > > > >> > > > seems like a nature choice for copying data
> > > between
> > > > > > disks
> > > > > > > > > since
> > > > > > > > > > it
> > > > > > > > > > > >> is
> > > > > > > > > > > >> > > > similar to copying data between brokers.
> Another
> > > > > reason
> > > > > > is
> > > > > > > > > that
> > > > > > > > > > if
> > > > > > > > > > > >> the
> > > > > > > > > > > >> > > > replica to be moved is a follower, we don't
> need
> > > > lock
> > > > > to
> > > > > > > > swap
> > > > > > > > > > > >> replicas
> > > > > > > > > > > >> > > when
> > > > > > > > > > > >> > > > destination replica has caught up, since the
> > same
> > > > > thread
> > > > > > > > which
> > > > > > > > > > is
> > > > > > > > > > > >> > > fetching
> > > > > > > > > > > >> > > > data from leader will swap the replica.
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > > The ReplicaFetchThread will not incur socket
> > > > overhead
> > > > > > > while
> > > > > > > > > > > copying
> > > > > > > > > > > >> > data
> > > > > > > > > > > >> > > > between disks. It will read directly from
> source
> > > > disk
> > > > > > (as
> > > > > > > we
> > > > > > > > > do
> > > > > > > > > > > when
> > > > > > > > > > > >> > > > processing FetchRequest) and write to
> > destination
> > > > disk
> > > > > > (as
> > > > > > > > we
> > > > > > > > > do
> > > > > > > > > > > >> when
> > > > > > > > > > > >> > > > processing ProduceRequest).
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > > > 1.2 I am also not sure about moving data in
> > the
> > > > > > > > > > > >> ReplicaFetcherThread
> > > > > > > > > > > >> > in
> > > > > > > > > > > >> > > > the
> > > > > > > > > > > >> > > > > follower. For example, I am not sure setting
> > > > > > > > > > > >> replica.fetch.max.wait
> > > > > > > > > > > >> > to
> > > > > > > > > > > >> > > 0
> > > > > > > > > > > >> > > > >  is ideal. It may not always be effective
> > since
> > > a
> > > > > > fetch
> > > > > > > > > > request
> > > > > > > > > > > in
> > > > > > > > > > > >> > the
> > > > > > > > > > > >> > > > > ReplicaFetcherThread could be arbitrarily
> > > delayed
> > > > > due
> > > > > > to
> > > > > > > > > > > >> replication
> > > > > > > > > > > >> > > > > throttling on the leader. In general, the
> data
> > > > > > movement
> > > > > > > > > logic
> > > > > > > > > > > >> across
> > > > > > > > > > > >> > > > disks
> > > > > > > > > > > >> > > > > seems different from that in
> > > ReplicaFetcherThread.
> > > > > > So, I
> > > > > > > > am
> > > > > > > > > > not
> > > > > > > > > > > >> sure
> > > > > > > > > > > >> > > why
> > > > > > > > > > > >> > > > > they need to be coupled.
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > > While it may not be the most efficient way to
> > copy
> > > > > data
> > > > > > > > > between
> > > > > > > > > > > >> local
> > > > > > > > > > > >> > > > disks, it will be at least as efficient as
> > copying
> > > > > data
> > > > > > > from
> > > > > > > > > > > leader
> > > > > > > > > > > >> to
> > > > > > > > > > > >> > > the
> > > > > > > > > > > >> > > > destination disk. The expected goal of KIP-113
> > is
> > > to
> > > > > > > enable
> > > > > > > > > data
> > > > > > > > > > > >> > movement
> > > > > > > > > > > >> > > > between disks with no less efficiency than
> what
> > we
> > > > do
> > > > > > now
> > > > > > > > when
> > > > > > > > > > > >> moving
> > > > > > > > > > > >> > > data
> > > > > > > > > > > >> > > > between brokers. I think we can optimize its
> > > > > performance
> > > > > > > > using
> > > > > > > > > > > >> separate
> > > > > > > > > > > >> > > > thread if the performance is not good enough.
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > > > 1.3 Could you add a bit more details on how
> we
> > > > swap
> > > > > > the
> > > > > > > > > > replicas
> > > > > > > > > > > >> when
> > > > > > > > > > > >> > > the
> > > > > > > > > > > >> > > > > new ones are fully caught up? For example,
> > what
> > > > > > happens
> > > > > > > > when
> > > > > > > > > > the
> > > > > > > > > > > >> new
> > > > > > > > > > > >> > > > > replica in the new log directory is caught
> up,
> > > but
> > > > > > when
> > > > > > > we
> > > > > > > > > > want
> > > > > > > > > > > >> to do
> > > > > > > > > > > >> > > the
> > > > > > > > > > > >> > > > > swap, some new data has arrived?
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > > If the replica is a leader, then
> > > > ReplicaFetcherThread
> > > > > > will
> > > > > > > > > > perform
> > > > > > > > > > > >> the
> > > > > > > > > > > >> > > > replacement. Proper lock is needed to prevent
> > > > > > > > > > KafkaRequestHandler
> > > > > > > > > > > >> from
> > > > > > > > > > > >> > > > appending data to the topicPartition.log on
> the
> > > > source
> > > > > > > disks
> > > > > > > > > > > before
> > > > > > > > > > > >> > this
> > > > > > > > > > > >> > > > replacement is completed by
> > ReplicaFetcherThread.
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > > If the replica is a follower, because the same
> > > > > > > > > > ReplicaFetchThread
> > > > > > > > > > > >> which
> > > > > > > > > > > >> > > > fetches data from leader will also swap the
> > > replica
> > > > ,
> > > > > no
> > > > > > > > lock
> > > > > > > > > is
> > > > > > > > > > > >> > needed.
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > > I have updated the KIP to specify both more
> > > > > explicitly.
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > > > 1.4 Do we need to do the .move at the log
> > > segment
> > > > > > level
> > > > > > > or
> > > > > > > > > > could
> > > > > > > > > > > >> we
> > > > > > > > > > > >> > > just
> > > > > > > > > > > >> > > > do
> > > > > > > > > > > >> > > > > that at the replica directory level?
> Renaming
> > > > just a
> > > > > > > > > directory
> > > > > > > > > > > is
> > > > > > > > > > > >> > much
> > > > > > > > > > > >> > > > > faster than renaming the log segments.
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > > Great point. I have updated the KIP to rename
> > the
> > > > log
> > > > > > > > > directory
> > > > > > > > > > > >> > instead.
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > > > 1.5 Could you also describe a bit what
> happens
> > > > when
> > > > > > > either
> > > > > > > > > the
> > > > > > > > > > > >> source
> > > > > > > > > > > >> > > or
> > > > > > > > > > > >> > > > > the target log directory fails while the
> data
> > > > moving
> > > > > > is
> > > > > > > in
> > > > > > > > > > > >> progress?
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > > If source log directory fails, then the
> replica
> > > > > movement
> > > > > > > > will
> > > > > > > > > > stop
> > > > > > > > > > > >> and
> > > > > > > > > > > >> > > the
> > > > > > > > > > > >> > > > source replica is marked offline. If
> destination
> > > log
> > > > > > > > directory
> > > > > > > > > > > >> fails,
> > > > > > > > > > > >> > > then
> > > > > > > > > > > >> > > > the replica movement will stop. I have updated
> > the
> > > > KIP
> > > > > > to
> > > > > > > > > > clarify
> > > > > > > > > > > >> this.
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > > 2. For partition reassignment.
> > > > > > > > > > > >> > > > > 2.1 I am not sure if the controller can
> block
> > on
> > > > > > > > > > > >> > > ChangeReplicaDirRequest.
> > > > > > > > > > > >> > > > > Data movement may take a long time to
> > complete.
> > > If
> > > > > > there
> > > > > > > > is
> > > > > > > > > an
> > > > > > > > > > > >> > > > outstanding
> > > > > > > > > > > >> > > > > request from the controller to a broker,
> that
> > > > broker
> > > > > > > won't
> > > > > > > > > be
> > > > > > > > > > > >> able to
> > > > > > > > > > > >> > > > > process any new request from the controller.
> > So
> > > if
> > > > > > > another
> > > > > > > > > > event
> > > > > > > > > > > >> > (e.g.
> > > > > > > > > > > >> > > > > broker failure) happens when the data
> movement
> > > is
> > > > in
> > > > > > > > > progress,
> > > > > > > > > > > >> > > subsequent
> > > > > > > > > > > >> > > > > LeaderAnIsrRequest will be delayed.
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > > Yeah good point. I missed the fact that there
> is
> > > be
> > > > > only
> > > > > > > one
> > > > > > > > > > > >> inflight
> > > > > > > > > > > >> > > > request from controller to broker.
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > > How about I add a request, e.g.
> > > > > > > BrokerDirStateUpdateRequest,
> > > > > > > > > > which
> > > > > > > > > > > >> maps
> > > > > > > > > > > >> > > > topicPartition to log directory and can be
> sent
> > > from
> > > > > > > broker
> > > > > > > > to
> > > > > > > > > > > >> > controller
> > > > > > > > > > > >> > > > to indicate completion?
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > > > 2.2 in the KIP, the partition reassignment
> > tool
> > > is
> > > > > > also
> > > > > > > > used
> > > > > > > > > > for
> > > > > > > > > > > >> > cases
> > > > > > > > > > > >> > > > > where an admin just wants to balance the
> > > existing
> > > > > data
> > > > > > > > > across
> > > > > > > > > > > log
> > > > > > > > > > > >> > > > > directories in the broker. In this case, it
> > > seems
> > > > > that
> > > > > > > > it's
> > > > > > > > > > over
> > > > > > > > > > > >> > > killing
> > > > > > > > > > > >> > > > to
> > > > > > > > > > > >> > > > > have the process go through the controller.
> A
> > > > > simpler
> > > > > > > > > approach
> > > > > > > > > > > is
> > > > > > > > > > > >> to
> > > > > > > > > > > >> > > > issue
> > > > > > > > > > > >> > > > > an RPC request to the broker directly.
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > > I agree we can optimize this case. It is just
> > that
> > > > we
> > > > > > have
> > > > > > > > to
> > > > > > > > > > add
> > > > > > > > > > > >> new
> > > > > > > > > > > >> > > logic
> > > > > > > > > > > >> > > > or code path to handle a scenario that is
> > already
> > > > > > covered
> > > > > > > by
> > > > > > > > > the
> > > > > > > > > > > >> more
> > > > > > > > > > > >> > > > complicated scenario. I will add it to the
> KIP.
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > > > 2.3 When using the partition reassignment
> tool
> > > to
> > > > > move
> > > > > > > > > > replicas
> > > > > > > > > > > >> > across
> > > > > > > > > > > >> > > > > brokers, it make sense to be able to specify
> > the
> > > > log
> > > > > > > > > directory
> > > > > > > > > > > of
> > > > > > > > > > > >> the
> > > > > > > > > > > >> > > > newly
> > > > > > > > > > > >> > > > > created replicas. The KIP does that in two
> > > > separate
> > > > > > > > requests
> > > > > > > > > > > >> > > > > ChangeReplicaDirRequest and
> > LeaderAndIsrRequest,
> > > > and
> > > > > > > > tracks
> > > > > > > > > > the
> > > > > > > > > > > >> > > progress
> > > > > > > > > > > >> > > > of
> > > > > > > > > > > >> > > > > each independently. An alternative is to do
> > that
> > > > > just
> > > > > > in
> > > > > > > > > > > >> > > > > LeaderAndIsrRequest.
> > > > > > > > > > > >> > > > > That way, the new replicas will be created
> in
> > > the
> > > > > > right
> > > > > > > > log
> > > > > > > > > > dir
> > > > > > > > > > > in
> > > > > > > > > > > >> > the
> > > > > > > > > > > >> > > > > first place and the controller just needs to
> > > track
> > > > > the
> > > > > > > > > > progress
> > > > > > > > > > > of
> > > > > > > > > > > >> > > > > partition reassignment in the current way.
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > > I agree it is better to use one request
> instead
> > of
> > > > two
> > > > > > to
> > > > > > > > > > request
> > > > > > > > > > > >> > replica
> > > > > > > > > > > >> > > > movement between disks. But I think the
> > > performance
> > > > > > > > advantage
> > > > > > > > > of
> > > > > > > > > > > >> doing
> > > > > > > > > > > >> > so
> > > > > > > > > > > >> > > > is negligible because we trigger replica
> > > assignment
> > > > > much
> > > > > > > > less
> > > > > > > > > > than
> > > > > > > > > > > >> all
> > > > > > > > > > > >> > > > other kinds of events in the Kafka cluster. I
> am
> > > not
> > > > > > sure
> > > > > > > > that
> > > > > > > > > > the
> > > > > > > > > > > >> > > benefit
> > > > > > > > > > > >> > > > of doing this is worth the effort to add an
> > > optional
> > > > > > > string
> > > > > > > > > > field
> > > > > > > > > > > in
> > > > > > > > > > > >> > the
> > > > > > > > > > > >> > > > LeaderAndIsrRequest. Also if we add this
> > optional
> > > > > field
> > > > > > in
> > > > > > > > the
> > > > > > > > > > > >> > > > LeaderAndIsrRequest, we probably want to
> remove
> > > > > > > > > > > >> ChangeReplicaDirRequest
> > > > > > > > > > > >> > > to
> > > > > > > > > > > >> > > > avoid having two requests doing the same
> thing.
> > > But
> > > > it
> > > > > > > means
> > > > > > > > > > user
> > > > > > > > > > > >> > script
> > > > > > > > > > > >> > > > can not send request directly to the broker to
> > > > trigger
> > > > > > > > replica
> > > > > > > > > > > >> movement
> > > > > > > > > > > >> > > > between log directories.
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > > I will do it if you are strong about this
> > > > optimzation.
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > > 3. /admin/reassign_partitions: Including the
> > log
> > > > dir
> > > > > > in
> > > > > > > > > every
> > > > > > > > > > > >> replica
> > > > > > > > > > > >> > > may
> > > > > > > > > > > >> > > > > not be efficient. We could include a list of
> > log
> > > > > > > > directories
> > > > > > > > > > and
> > > > > > > > > > > >> > > > reference
> > > > > > > > > > > >> > > > > the index of the log directory in each
> > replica.
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > > Good point. I have updated the KIP to use this
> > > > > solution.
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > > 4. DescribeDirsRequest: The stats in the
> > request
> > > > are
> > > > > > > > already
> > > > > > > > > > > >> > available
> > > > > > > > > > > >> > > > from
> > > > > > > > > > > >> > > > > JMX. Do we need the new request?
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > > Does JMX also include the state (i.e. offline
> or
> > > > > online)
> > > > > > > of
> > > > > > > > > each
> > > > > > > > > > > log
> > > > > > > > > > > >> > > > directory and the log directory of each
> replica?
> > > If
> > > > > not,
> > > > > > > > then
> > > > > > > > > > > maybe
> > > > > > > > > > > >> we
> > > > > > > > > > > >> > > > still need DescribeDirsRequest?
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > > 5. We want to be consistent on
> > > > > ChangeReplicaDirRequest
> > > > > > > vs
> > > > > > > > > > > >> > > > > ChangeReplicaRequest.
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > > I think ChangeReplicaRequest and
> > > > ChangeReplicaResponse
> > > > > > is
> > > > > > > my
> > > > > > > > > > typo.
> > > > > > > > > > > >> > Sorry,
> > > > > > > > > > > >> > > > they are fixed now.
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > > Thanks,
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > > Jun
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > > On Fri, Feb 3, 2017 at 6:19 PM, Dong Lin <
> > > > > > > > > lindon...@gmail.com
> > > > > > > > > > >
> > > > > > > > > > > >> > wrote:
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > > > Hey ALexey,
> > > > > > > > > > > >> > > > > >
> > > > > > > > > > > >> > > > > > Thanks for all the comments!
> > > > > > > > > > > >> > > > > >
> > > > > > > > > > > >> > > > > > I have updated the KIP to specify how we
> > > enforce
> > > > > > > quota.
> > > > > > > > I
> > > > > > > > > > also
> > > > > > > > > > > >> > > updated
> > > > > > > > > > > >> > > > > the
> > > > > > > > > > > >> > > > > > "The thread model and broker logic for
> > moving
> > > > > > replica
> > > > > > > > data
> > > > > > > > > > > >> between
> > > > > > > > > > > >> > > log
> > > > > > > > > > > >> > > > > > directories" to make it easier to read.
> You
> > > can
> > > > > find
> > > > > > > the
> > > > > > > > > > exact
> > > > > > > > > > > >> > change
> > > > > > > > > > > >> > > > > here
> > > > > > > > > > > >> > > > > > <https://cwiki.apache.org/conf
> > > > > > > > > > luence/pages/diffpagesbyversio
> > > > > > > > > > > >> > > > > > n.action?pageId=67638408&selec
> > > > > > > > > > tedPageVersions=5&selectedPage
> > > > > > > > > > > >> > > > Versions=6>.
> > > > > > > > > > > >> > > > > > The idea is to use the same replication
> > quota
> > > > > > > mechanism
> > > > > > > > > > > >> introduced
> > > > > > > > > > > >> > in
> > > > > > > > > > > >> > > > > > KIP-73.
> > > > > > > > > > > >> > > > > >
> > > > > > > > > > > >> > > > > > Thanks,
> > > > > > > > > > > >> > > > > > Dong
> > > > > > > > > > > >> > > > > >
> > > > > > > > > > > >> > > > > >
> > > > > > > > > > > >> > > > > >
> > > > > > > > > > > >> > > > > > On Wed, Feb 1, 2017 at 2:16 AM, Alexey
> > > > Ozeritsky <
> > > > > > > > > > > >> > > aozerit...@yandex.ru
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > > > wrote:
> > > > > > > > > > > >> > > > > >
> > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > >> > > > > > > 24.01.2017, 22:03, "Dong Lin" <
> > > > > > lindon...@gmail.com
> > > > > > > >:
> > > > > > > > > > > >> > > > > > > > Hey Alexey,
> > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > >> > > > > > > > Thanks. I think we agreed that the
> > > suggested
> > > > > > > > solution
> > > > > > > > > > > >> doesn't
> > > > > > > > > > > >> > > work
> > > > > > > > > > > >> > > > in
> > > > > > > > > > > >> > > > > > > > general for kafka users. To answer
> your
> > > > > > questions:
> > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > >> > > > > > > > 1. I agree we need quota to rate limit
> > > > replica
> > > > > > > > > movement
> > > > > > > > > > > >> when a
> > > > > > > > > > > >> > > > broker
> > > > > > > > > > > >> > > > > > is
> > > > > > > > > > > >> > > > > > > > moving a "leader" replica. I will come
> > up
> > > > with
> > > > > > > > > solution,
> > > > > > > > > > > >> > probably
> > > > > > > > > > > >> > > > > > re-use
> > > > > > > > > > > >> > > > > > > > the config of replication quota
> > introduced
> > > > in
> > > > > > > > KIP-73.
> > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > >> > > > > > > > 2. Good point. I agree that this is a
> > > > problem
> > > > > in
> > > > > > > > > > general.
> > > > > > > > > > > >> If is
> > > > > > > > > > > >> > > no
> > > > > > > > > > > >> > > > > new
> > > > > > > > > > > >> > > > > > > data
> > > > > > > > > > > >> > > > > > > > on that broker, with current default
> > value
> > > > of
> > > > > > > > > > > >> > > > > > replica.fetch.wait.max.ms
> > > > > > > > > > > >> > > > > > > > and replica.fetch.max.bytes, the
> replica
> > > > will
> > > > > be
> > > > > > > > moved
> > > > > > > > > > at
> > > > > > > > > > > >> only
> > > > > > > > > > > >> > 2
> > > > > > > > > > > >> > > > MBps
> > > > > > > > > > > >> > > > > > > > throughput. I think the solution is
> for
> > > > broker
> > > > > > to
> > > > > > > > set
> > > > > > > > > > > >> > > > > > > > replica.fetch.wait.max.ms to 0 in its
> > > > > > > FetchRequest
> > > > > > > > if
> > > > > > > > > > the
> > > > > > > > > > > >> > > > > > corresponding
> > > > > > > > > > > >> > > > > > > > ReplicaFetcherThread needs to move
> some
> > > > > replica
> > > > > > to
> > > > > > > > > > another
> > > > > > > > > > > >> > disk.
> > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > >> > > > > > > > 3. I have updated the KIP to mention
> > that
> > > > the
> > > > > > read
> > > > > > > > > size
> > > > > > > > > > > of a
> > > > > > > > > > > >> > > given
> > > > > > > > > > > >> > > > > > > > partition is configured using
> > > > > > > > replica.fetch.max.bytes
> > > > > > > > > > when
> > > > > > > > > > > >> we
> > > > > > > > > > > >> > > move
> > > > > > > > > > > >> > > > > > > replicas
> > > > > > > > > > > >> > > > > > > > between disks.
> > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > >> > > > > > > > Please see this
> > > > > > > > > > > >> > > > > > > > <https://cwiki.apache.org/conf
> > > > > > > > > > > >> luence/pages/diffpagesbyversio
> > > > > > > > > > > >> > > > n.action
> > > > > > > > > > > >> > > > > ?
> > > > > > > > > > > >> > > > > > > pageId=67638408&selectedPageVe
> > > > > > > > > > > rsions=4&selectedPageVersions=
> > > > > > > > > > > >> 5>
> > > > > > > > > > > >> > > > > > > > for the change of the KIP. I will come
> > up
> > > > > with a
> > > > > > > > > > solution
> > > > > > > > > > > to
> > > > > > > > > > > >> > > > throttle
> > > > > > > > > > > >> > > > > > > > replica movement when a broker is
> > moving a
> > > > > > > "leader"
> > > > > > > > > > > replica.
> > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > >> > > > > > > Thanks. It looks great.
> > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > >> > > > > > > > On Tue, Jan 24, 2017 at 3:30 AM,
> Alexey
> > > > > > Ozeritsky
> > > > > > > <
> > > > > > > > > > > >> > > > > > aozerit...@yandex.ru>
> > > > > > > > > > > >> > > > > > > > wrote:
> > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > >> > > > > > > >>  23.01.2017, 22:11, "Dong Lin" <
> > > > > > > > lindon...@gmail.com
> > > > > > > > > >:
> > > > > > > > > > > >> > > > > > > >>  > Thanks. Please see my comment
> > inline.
> > > > > > > > > > > >> > > > > > > >>  >
> > > > > > > > > > > >> > > > > > > >>  > On Mon, Jan 23, 2017 at 6:45 AM,
> > > Alexey
> > > > > > > > Ozeritsky
> > > > > > > > > <
> > > > > > > > > > > >> > > > > > > aozerit...@yandex.ru>
> > > > > > > > > > > >> > > > > > > >>  > wrote:
> > > > > > > > > > > >> > > > > > > >>  >
> > > > > > > > > > > >> > > > > > > >>  >> 13.01.2017, 22:29, "Dong Lin" <
> > > > > > > > > lindon...@gmail.com
> > > > > > > > > > >:
> > > > > > > > > > > >> > > > > > > >>  >> > Hey Alexey,
> > > > > > > > > > > >> > > > > > > >>  >> >
> > > > > > > > > > > >> > > > > > > >>  >> > Thanks for your review and the
> > > > > > alternative
> > > > > > > > > > > approach.
> > > > > > > > > > > >> > Here
> > > > > > > > > > > >> > > is
> > > > > > > > > > > >> > > > > my
> > > > > > > > > > > >> > > > > > > >>  >> > understanding of your patch.
> > > kafka's
> > > > > > > > background
> > > > > > > > > > > >> threads
> > > > > > > > > > > >> > > are
> > > > > > > > > > > >> > > > > used
> > > > > > > > > > > >> > > > > > > to
> > > > > > > > > > > >> > > > > > > >>  move
> > > > > > > > > > > >> > > > > > > >>  >> > data between replicas. When
> data
> > > > > movement
> > > > > > > is
> > > > > > > > > > > >> triggered,
> > > > > > > > > > > >> > > the
> > > > > > > > > > > >> > > > > log
> > > > > > > > > > > >> > > > > > > will
> > > > > > > > > > > >> > > > > > > >>  be
> > > > > > > > > > > >> > > > > > > >>  >> > rolled and the new logs will be
> > put
> > > > in
> > > > > > the
> > > > > > > > new
> > > > > > > > > > > >> > directory,
> > > > > > > > > > > >> > > > and
> > > > > > > > > > > >> > > > > > > >>  background
> > > > > > > > > > > >> > > > > > > >>  >> > threads will move segment from
> > old
> > > > > > > directory
> > > > > > > > to
> > > > > > > > > > new
> > > > > > > > > > > >> > > > directory.
> > > > > > > > > > > >> > > > > > > >>  >> >
> > > > > > > > > > > >> > > > > > > >>  >> > It is important to note that
> > > KIP-112
> > > > is
> > > > > > > > > intended
> > > > > > > > > > to
> > > > > > > > > > > >> work
> > > > > > > > > > > >> > > > with
> > > > > > > > > > > >> > > > > > > >>  KIP-113 to
> > > > > > > > > > > >> > > > > > > >>  >> > support JBOD. I think your
> > solution
> > > > is
> > > > > > > > > definitely
> > > > > > > > > > > >> > simpler
> > > > > > > > > > > >> > > > and
> > > > > > > > > > > >> > > > > > > better
> > > > > > > > > > > >> > > > > > > >>  >> under
> > > > > > > > > > > >> > > > > > > >>  >> > the current kafka
> implementation
> > > > that a
> > > > > > > > broker
> > > > > > > > > > will
> > > > > > > > > > > >> fail
> > > > > > > > > > > >> > > if
> > > > > > > > > > > >> > > > > any
> > > > > > > > > > > >> > > > > > > disk
> > > > > > > > > > > >> > > > > > > >>  >> fails.
> > > > > > > > > > > >> > > > > > > >>  >> > But I am not sure if we want to
> > > allow
> > > > > > > broker
> > > > > > > > to
> > > > > > > > > > run
> > > > > > > > > > > >> with
> > > > > > > > > > > >> > > > > partial
> > > > > > > > > > > >> > > > > > > >>  disks
> > > > > > > > > > > >> > > > > > > >>  >> > failure. Let's say the a
> replica
> > is
> > > > > being
> > > > > > > > moved
> > > > > > > > > > > from
> > > > > > > > > > > >> > > > > log_dir_old
> > > > > > > > > > > >> > > > > > > to
> > > > > > > > > > > >> > > > > > > >>  >> > log_dir_new and then
> log_dir_old
> > > > stops
> > > > > > > > working
> > > > > > > > > > due
> > > > > > > > > > > to
> > > > > > > > > > > >> > disk
> > > > > > > > > > > >> > > > > > > failure.
> > > > > > > > > > > >> > > > > > > >>  How
> > > > > > > > > > > >> > > > > > > >>  >> > would your existing patch
> handles
> > > it?
> > > > > To
> > > > > > > make
> > > > > > > > > the
> > > > > > > > > > > >> > > scenario a
> > > > > > > > > > > >> > > > > bit
> > > > > > > > > > > >> > > > > > > more
> > > > > > > > > > > >> > > > > > > >>  >>
> > > > > > > > > > > >> > > > > > > >>  >> We will lose log_dir_old. After
> > > broker
> > > > > > > restart
> > > > > > > > we
> > > > > > > > > > can
> > > > > > > > > > > >> read
> > > > > > > > > > > >> > > the
> > > > > > > > > > > >> > > > > > data
> > > > > > > > > > > >> > > > > > > >>  from
> > > > > > > > > > > >> > > > > > > >>  >> log_dir_new.
> > > > > > > > > > > >> > > > > > > >>  >
> > > > > > > > > > > >> > > > > > > >>  > No, you probably can't. This is
> > > because
> > > > > the
> > > > > > > > broker
> > > > > > > > > > > >> doesn't
> > > > > > > > > > > >> > > have
> > > > > > > > > > > >> > > > > > > *all* the
> > > > > > > > > > > >> > > > > > > >>  > data for this partition. For
> > example,
> > > > say
> > > > > > the
> > > > > > > > > broker
> > > > > > > > > > > has
> > > > > > > > > > > >> > > > > > > >>  > partition_segement_1,
> > > > partition_segment_50
> > > > > > and
> > > > > > > > > > > >> > > > > > partition_segment_100
> > > > > > > > > > > >> > > > > > > on
> > > > > > > > > > > >> > > > > > > >>  the
> > > > > > > > > > > >> > > > > > > >>  > log_dir_old.
> partition_segment_100,
> > > > which
> > > > > > has
> > > > > > > > the
> > > > > > > > > > > latest
> > > > > > > > > > > >> > > data,
> > > > > > > > > > > >> > > > > has
> > > > > > > > > > > >> > > > > > > been
> > > > > > > > > > > >> > > > > > > >>  > moved to log_dir_new, and the
> > > > log_dir_old
> > > > > > > fails
> > > > > > > > > > before
> > > > > > > > > > > >> > > > > > > >>  partition_segment_50
> > > > > > > > > > > >> > > > > > > >>  > and partition_segment_1 is moved
> to
> > > > > > > log_dir_new.
> > > > > > > > > > When
> > > > > > > > > > > >> > broker
> > > > > > > > > > > >> > > > > > > re-starts,
> > > > > > > > > > > >> > > > > > > >>  it
> > > > > > > > > > > >> > > > > > > >>  > won't have partition_segment_50.
> > This
> > > > > causes
> > > > > > > > > problem
> > > > > > > > > > > if
> > > > > > > > > > > >> > > broker
> > > > > > > > > > > >> > > > is
> > > > > > > > > > > >> > > > > > > elected
> > > > > > > > > > > >> > > > > > > >>  > leader and consumer wants to
> consume
> > > > data
> > > > > in
> > > > > > > the
> > > > > > > > > > > >> > > > > > partition_segment_1.
> > > > > > > > > > > >> > > > > > > >>
> > > > > > > > > > > >> > > > > > > >>  Right.
> > > > > > > > > > > >> > > > > > > >>
> > > > > > > > > > > >> > > > > > > >>  >
> > > > > > > > > > > >> > > > > > > >>  >> > complicated, let's say the
> broker
> > > is
> > > > > > > > shtudown,
> > > > > > > > > > > >> > > log_dir_old's
> > > > > > > > > > > >> > > > > > disk
> > > > > > > > > > > >> > > > > > > >>  fails,
> > > > > > > > > > > >> > > > > > > >>  >> > and the broker starts. In this
> > case
> > > > > > broker
> > > > > > > > > > doesn't
> > > > > > > > > > > >> even
> > > > > > > > > > > >> > > know
> > > > > > > > > > > >> > > > > if
> > > > > > > > > > > >> > > > > > > >>  >> log_dir_new
> > > > > > > > > > > >> > > > > > > >>  >> > has all the data needed for
> this
> > > > > replica.
> > > > > > > It
> > > > > > > > > > > becomes
> > > > > > > > > > > >> a
> > > > > > > > > > > >> > > > problem
> > > > > > > > > > > >> > > > > > if
> > > > > > > > > > > >> > > > > > > the
> > > > > > > > > > > >> > > > > > > >>  >> > broker is elected leader of
> this
> > > > > > partition
> > > > > > > in
> > > > > > > > > > this
> > > > > > > > > > > >> case.
> > > > > > > > > > > >> > > > > > > >>  >>
> > > > > > > > > > > >> > > > > > > >>  >> log_dir_new contains the most
> > recent
> > > > data
> > > > > > so
> > > > > > > we
> > > > > > > > > > will
> > > > > > > > > > > >> lose
> > > > > > > > > > > >> > > the
> > > > > > > > > > > >> > > > > tail
> > > > > > > > > > > >> > > > > > > of
> > > > > > > > > > > >> > > > > > > >>  >> partition.
> > > > > > > > > > > >> > > > > > > >>  >> This is not a big problem for us
> > > > because
> > > > > we
> > > > > > > > > already
> > > > > > > > > > > >> delete
> > > > > > > > > > > >> > > > tails
> > > > > > > > > > > >> > > > > > by
> > > > > > > > > > > >> > > > > > > >>  hand
> > > > > > > > > > > >> > > > > > > >>  >> (see
> > https://issues.apache.org/jira
> > > > > > > > > > > /browse/KAFKA-1712
> > > > > > > > > > > >> ).
> > > > > > > > > > > >> > > > > > > >>  >> Also we dont use authomatic
> leader
> > > > > > balancing
> > > > > > > > > > > >> > > > > > > >>  (auto.leader.rebalance.enable=
> false),
> > > > > > > > > > > >> > > > > > > >>  >> so this partition becomes the
> > leader
> > > > > with a
> > > > > > > low
> > > > > > > > > > > >> > probability.
> > > > > > > > > > > >> > > > > > > >>  >> I think my patch can be modified
> to
> > > > > > prohibit
> > > > > > > > the
> > > > > > > > > > > >> selection
> > > > > > > > > > > >> > > of
> > > > > > > > > > > >> > > > > the
> > > > > > > > > > > >> > > > > > > >>  leader
> > > > > > > > > > > >> > > > > > > >>  >> until the partition does not move
> > > > > > completely.
> > > > > > > > > > > >> > > > > > > >>  >
> > > > > > > > > > > >> > > > > > > >>  > I guess you are saying that you
> have
> > > > > deleted
> > > > > > > the
> > > > > > > > > > tails
> > > > > > > > > > > >> by
> > > > > > > > > > > >> > > hand
> > > > > > > > > > > >> > > > in
> > > > > > > > > > > >> > > > > > > your
> > > > > > > > > > > >> > > > > > > >>  own
> > > > > > > > > > > >> > > > > > > >>  > kafka branch. But KAFKA-1712 is
> not
> > > > > accepted
> > > > > > > > into
> > > > > > > > > > > Kafka
> > > > > > > > > > > >> > trunk
> > > > > > > > > > > >> > > > > and I
> > > > > > > > > > > >> > > > > > > am
> > > > > > > > > > > >> > > > > > > >>  not
> > > > > > > > > > > >> > > > > > > >>
> > > > > > > > > > > >> > > > > > > >>  No. We just modify segments mtime by
> > > cron
> > > > > job.
> > > > > > > > This
> > > > > > > > > > > works
> > > > > > > > > > > >> > with
> > > > > > > > > > > >> > > > > > vanilla
> > > > > > > > > > > >> > > > > > > >>  kafka.
> > > > > > > > > > > >> > > > > > > >>
> > > > > > > > > > > >> > > > > > > >>  > sure if it is the right solution.
> > How
> > > > > would
> > > > > > > this
> > > > > > > > > > > >> solution
> > > > > > > > > > > >> > > > address
> > > > > > > > > > > >> > > > > > the
> > > > > > > > > > > >> > > > > > > >>  > problem mentioned above?
> > > > > > > > > > > >> > > > > > > >>
> > > > > > > > > > > >> > > > > > > >>  If you need only fresh data and if
> you
> > > > > remove
> > > > > > > old
> > > > > > > > > data
> > > > > > > > > > > by
> > > > > > > > > > > >> > hands
> > > > > > > > > > > >> > > > > this
> > > > > > > > > > > >> > > > > > is
> > > > > > > > > > > >> > > > > > > >>  not a problem. But in general case
> > > > > > > > > > > >> > > > > > > >>  this is a problem of course.
> > > > > > > > > > > >> > > > > > > >>
> > > > > > > > > > > >> > > > > > > >>  >
> > > > > > > > > > > >> > > > > > > >>  > BTW, I am not sure the solution
> > > > mentioned
> > > > > in
> > > > > > > > > > > KAFKA-1712
> > > > > > > > > > > >> is
> > > > > > > > > > > >> > > the
> > > > > > > > > > > >> > > > > > right
> > > > > > > > > > > >> > > > > > > way
> > > > > > > > > > > >> > > > > > > >>  to
> > > > > > > > > > > >> > > > > > > >>  > address its problem. Now that we
> > have
> > > > > > > timestamp
> > > > > > > > in
> > > > > > > > > > the
> > > > > > > > > > > >> > > message
> > > > > > > > > > > >> > > > we
> > > > > > > > > > > >> > > > > > > can use
> > > > > > > > > > > >> > > > > > > >>  > that to delete old segement
> instead
> > of
> > > > > > relying
> > > > > > > > on
> > > > > > > > > > the
> > > > > > > > > > > >> log
> > > > > > > > > > > >> > > > segment
> > > > > > > > > > > >> > > > > > > mtime.
> > > > > > > > > > > >> > > > > > > >>  > Just some idea and we don't have
> to
> > > > > discuss
> > > > > > > this
> > > > > > > > > > > problem
> > > > > > > > > > > >> > > here.
> > > > > > > > > > > >> > > > > > > >>  >
> > > > > > > > > > > >> > > > > > > >>  >> >
> > > > > > > > > > > >> > > > > > > >>  >> > The solution presented in the
> KIP
> > > > > > attempts
> > > > > > > to
> > > > > > > > > > > handle
> > > > > > > > > > > >> it
> > > > > > > > > > > >> > by
> > > > > > > > > > > >> > > > > > > replacing
> > > > > > > > > > > >> > > > > > > >>  >> > replica in an atomic version
> > > fashion
> > > > > > after
> > > > > > > > the
> > > > > > > > > > log
> > > > > > > > > > > in
> > > > > > > > > > > >> > the
> > > > > > > > > > > >> > > > new
> > > > > > > > > > > >> > > > > > dir
> > > > > > > > > > > >> > > > > > > has
> > > > > > > > > > > >> > > > > > > >>  >> fully
> > > > > > > > > > > >> > > > > > > >>  >> > caught up with the log in the
> old
> > > > dir.
> > > > > At
> > > > > > > at
> > > > > > > > > time
> > > > > > > > > > > the
> > > > > > > > > > > >> > log
> > > > > > > > > > > >> > > > can
> > > > > > > > > > > >> > > > > be
> > > > > > > > > > > >> > > > > > > >>  >> considered
> > > > > > > > > > > >> > > > > > > >>  >> > to exist on only one log
> > directory.
> > > > > > > > > > > >> > > > > > > >>  >>
> > > > > > > > > > > >> > > > > > > >>  >> As I understand your solution
> does
> > > not
> > > > > > cover
> > > > > > > > > > quotas.
> > > > > > > > > > > >> > > > > > > >>  >> What happens if someone starts to
> > > > > transfer
> > > > > > > 100
> > > > > > > > > > > >> partitions
> > > > > > > > > > > >> > ?
> > > > > > > > > > > >> > > > > > > >>  >
> > > > > > > > > > > >> > > > > > > >>  > Good point. Quota can be
> implemented
> > > in
> > > > > the
> > > > > > > > > future.
> > > > > > > > > > It
> > > > > > > > > > > >> is
> > > > > > > > > > > >> > > > > currently
> > > > > > > > > > > >> > > > > > > >>  > mentioned as as a potential future
> > > > > > improvement
> > > > > > > > in
> > > > > > > > > > > >> KIP-112
> > > > > > > > > > > >> > > > > > > >>  > <https://cwiki.apache.org/conf
> > > > > > > > > > > luence/display/KAFKA/KIP-
> > > > > > > > > > > >> > 112%3
> > > > > > > > > > > >> > > > > > > >>  A+Handle+disk+failure+for+
> > JBOD>.Thanks
> > > > > > > > > > > >> > > > > > > >>  > for the reminder. I will move it
> to
> > > > > KIP-113.
> > > > > > > > > > > >> > > > > > > >>  >
> > > > > > > > > > > >> > > > > > > >>  >> > If yes, it will read a
> > > > > > ByteBufferMessageSet
> > > > > > > > > from
> > > > > > > > > > > >> > > > > > > topicPartition.log
> > > > > > > > > > > >> > > > > > > >>  and
> > > > > > > > > > > >> > > > > > > >>  >> append the message set to
> > > > > > topicPartition.move
> > > > > > > > > > > >> > > > > > > >>  >>
> > > > > > > > > > > >> > > > > > > >>  >> i.e. processPartitionData will
> read
> > > > data
> > > > > > from
> > > > > > > > the
> > > > > > > > > > > >> > beginning
> > > > > > > > > > > >> > > of
> > > > > > > > > > > >> > > > > > > >>  >> topicPartition.log? What is the
> > read
> > > > > size?
> > > > > > > > > > > >> > > > > > > >>  >> A ReplicaFetchThread reads many
> > > > > partitions
> > > > > > so
> > > > > > > > if
> > > > > > > > > > one
> > > > > > > > > > > >> does
> > > > > > > > > > > >> > > some
> > > > > > > > > > > >> > > > > > > >>  complicated
> > > > > > > > > > > >> > > > > > > >>  >> work (= read a lot of data from
> > disk)
> > > > > > > > everything
> > > > > > > > > > will
> > > > > > > > > > > >> slow
> > > > > > > > > > > >> > > > down.
> > > > > > > > > > > >> > > > > > > >>  >> I think read size should not be
> > very
> > > > big.
> > > > > > > > > > > >> > > > > > > >>  >>
> > > > > > > > > > > >> > > > > > > >>  >> On the other hand at this point
> > > > > > > > > > > (processPartitionData)
> > > > > > > > > > > >> one
> > > > > > > > > > > >> > > can
> > > > > > > > > > > >> > > > > use
> > > > > > > > > > > >> > > > > > > only
> > > > > > > > > > > >> > > > > > > >>  >> the new data
> (ByteBufferMessageSet
> > > from
> > > > > > > > > parameters)
> > > > > > > > > > > and
> > > > > > > > > > > >> > wait
> > > > > > > > > > > >> > > > > until
> > > > > > > > > > > >> > > > > > > >>  >> (topicPartition.move.
> > smallestOffset
> > > <=
> > > > > > > > > > > >> > > > > > > topicPartition.log.smallestOff
> > > > > > > > > > > >> > > > > > > >>  set
> > > > > > > > > > > >> > > > > > > >>  >> && topicPartition.log.
> > largestOffset
> > > ==
> > > > > > > > > > > >> > > > > > > topicPartition.log.largestOffs
> > > > > > > > > > > >> > > > > > > >>  et).
> > > > > > > > > > > >> > > > > > > >>  >> In this case the write speed to
> > > > > > > > > topicPartition.move
> > > > > > > > > > > and
> > > > > > > > > > > >> > > > > > > >>  topicPartition.log
> > > > > > > > > > > >> > > > > > > >>  >> will be the same so this will
> allow
> > > us
> > > > to
> > > > > > > move
> > > > > > > > > many
> > > > > > > > > > > >> > > partitions
> > > > > > > > > > > >> > > > > to
> > > > > > > > > > > >> > > > > > > one
> > > > > > > > > > > >> > > > > > > >>  disk.
> > > > > > > > > > > >> > > > > > > >>  >
> > > > > > > > > > > >> > > > > > > >>  > The read size of a given partition
> > is
> > > > > > > configured
> > > > > > > > > > > >> > > > > > > >>  > using replica.fetch.max.bytes,
> which
> > > is
> > > > > the
> > > > > > > same
> > > > > > > > > > size
> > > > > > > > > > > >> used
> > > > > > > > > > > >> > by
> > > > > > > > > > > >> > > > > > > >>  FetchRequest
> > > > > > > > > > > >> > > > > > > >>  > from follower to leader. If the
> > broker
> > > > is
> > > > > > > > moving a
> > > > > > > > > > > >> replica
> > > > > > > > > > > >> > > for
> > > > > > > > > > > >> > > > > > which
> > > > > > > > > > > >> > > > > > > it
> > > > > > > > > > > >> > > > > > > >>
> > > > > > > > > > > >> > > > > > > >>  OK. Could you mention it in KIP?
> > > > > > > > > > > >> > > > > > > >>
> > > > > > > > > > > >> > > > > > > >>  > acts as a follower, the disk write
> > > rate
> > > > > for
> > > > > > > > moving
> > > > > > > > > > > this
> > > > > > > > > > > >> > > replica
> > > > > > > > > > > >> > > > > is
> > > > > > > > > > > >> > > > > > at
> > > > > > > > > > > >> > > > > > > >>  most
> > > > > > > > > > > >> > > > > > > >>  > the rate it fetches from leader
> > > (assume
> > > > it
> > > > > > is
> > > > > > > > > > catching
> > > > > > > > > > > >> up
> > > > > > > > > > > >> > and
> > > > > > > > > > > >> > > > has
> > > > > > > > > > > >> > > > > > > >>  > sufficient data to read from
> leader,
> > > > which
> > > > > > is
> > > > > > > > > > subject
> > > > > > > > > > > to
> > > > > > > > > > > >> > > > > > > round-trip-time
> > > > > > > > > > > >> > > > > > > >>  > between itself and the leader.
> Thus
> > > this
> > > > > > part
> > > > > > > if
> > > > > > > > > > > >> probably
> > > > > > > > > > > >> > > fine
> > > > > > > > > > > >> > > > > even
> > > > > > > > > > > >> > > > > > > >>  without
> > > > > > > > > > > >> > > > > > > >>  > quota.
> > > > > > > > > > > >> > > > > > > >>
> > > > > > > > > > > >> > > > > > > >>  I think there are 2 problems
> > > > > > > > > > > >> > > > > > > >>  1. Without speed limiter this will
> not
> > > > work
> > > > > > good
> > > > > > > > > even
> > > > > > > > > > > for
> > > > > > > > > > > >> 1
> > > > > > > > > > > >> > > > > > partition.
> > > > > > > > > > > >> > > > > > > In
> > > > > > > > > > > >> > > > > > > >>  our production we had a problem so
> we
> > > did
> > > > > the
> > > > > > > > > throuput
> > > > > > > > > > > >> > limiter:
> > > > > > > > > > > >> > > > > > > >>  https://github.com/resetius/ka
> > > > > > > > > > > >> fka/commit/cda31dadb2f135743bf
> > > > > > > > > > > >> > > > > > > >>  41083062927886c5ddce1#diff-ffa
> > > > > > > > > > > >> 8861e850121997a534ebdde2929c6R
> > > > > > > > > > > >> > > 713
> > > > > > > > > > > >> > > > > > > >>
> > > > > > > > > > > >> > > > > > > >>  2. I dont understand how it will
> work
> > in
> > > > > case
> > > > > > of
> > > > > > > > big
> > > > > > > > > > > >> > > > > > > >>  replica.fetch.wait.max.ms and
> > partition
> > > > > with
> > > > > > > > > > irregular
> > > > > > > > > > > >> flow.
> > > > > > > > > > > >> > > > > > > >>  For example someone could have
> > > > > > > > > > > replica.fetch.wait.max.ms
> > > > > > > > > > > >> > =10mi
> > > > > > > > > > > >> > > > nutes
> > > > > > > > > > > >> > > > > > and
> > > > > > > > > > > >> > > > > > > >>  partition that has very high data
> flow
> > > > from
> > > > > > > 12:00
> > > > > > > > to
> > > > > > > > > > > 13:00
> > > > > > > > > > > >> > and
> > > > > > > > > > > >> > > > zero
> > > > > > > > > > > >> > > > > > > flow
> > > > > > > > > > > >> > > > > > > >>  otherwise.
> > > > > > > > > > > >> > > > > > > >>  In this case processPartitionData
> > could
> > > be
> > > > > > > called
> > > > > > > > > once
> > > > > > > > > > > per
> > > > > > > > > > > >> > > > > 10minutes
> > > > > > > > > > > >> > > > > > > so if
> > > > > > > > > > > >> > > > > > > >>  we start data moving in 13:01 it
> will
> > be
> > > > > > > finished
> > > > > > > > > next
> > > > > > > > > > > >> day.
> > > > > > > > > > > >> > > > > > > >>
> > > > > > > > > > > >> > > > > > > >>  >
> > > > > > > > > > > >> > > > > > > >>  > But ff the broker is moving a
> > replica
> > > > for
> > > > > > > which
> > > > > > > > it
> > > > > > > > > > > acts
> > > > > > > > > > > >> as
> > > > > > > > > > > >> > a
> > > > > > > > > > > >> > > > > > leader,
> > > > > > > > > > > >> > > > > > > as
> > > > > > > > > > > >> > > > > > > >>  of
> > > > > > > > > > > >> > > > > > > >>  > current KIP the broker will keep
> > > reading
> > > > > > from
> > > > > > > > > > > >> log_dir_old
> > > > > > > > > > > >> > and
> > > > > > > > > > > >> > > > > > append
> > > > > > > > > > > >> > > > > > > to
> > > > > > > > > > > >> > > > > > > >>  > log_dir_new without having to wait
> > for
> > > > > > > > > > > round-trip-time.
> > > > > > > > > > > >> We
> > > > > > > > > > > >> > > > > probably
> > > > > > > > > > > >> > > > > > > need
> > > > > > > > > > > >> > > > > > > >>  > quota for this in the future.
> > > > > > > > > > > >> > > > > > > >>  >
> > > > > > > > > > > >> > > > > > > >>  >> >
> > > > > > > > > > > >> > > > > > > >>  >> > And to answer your question,
> yes
> > > > > > > > > > topicpartition.log
> > > > > > > > > > > >> > refers
> > > > > > > > > > > >> > > > to
> > > > > > > > > > > >> > > > > > > >>  >> > topic-paritition/segment.log.
> > > > > > > > > > > >> > > > > > > >>  >> >
> > > > > > > > > > > >> > > > > > > >>  >> > Thanks,
> > > > > > > > > > > >> > > > > > > >>  >> > Dong
> > > > > > > > > > > >> > > > > > > >>  >> >
> > > > > > > > > > > >> > > > > > > >>  >> > On Fri, Jan 13, 2017 at 4:12
> AM,
> > > > Alexey
> > > > > > > > > > Ozeritsky <
> > > > > > > > > > > >> > > > > > > >>  aozerit...@yandex.ru>
> > > > > > > > > > > >> > > > > > > >>  >> > wrote:
> > > > > > > > > > > >> > > > > > > >>  >> >
> > > > > > > > > > > >> > > > > > > >>  >> >> Hi,
> > > > > > > > > > > >> > > > > > > >>  >> >>
> > > > > > > > > > > >> > > > > > > >>  >> >> We have the similar solution
> > that
> > > > have
> > > > > > > been
> > > > > > > > > > > working
> > > > > > > > > > > >> in
> > > > > > > > > > > >> > > > > > production
> > > > > > > > > > > >> > > > > > > >>  since
> > > > > > > > > > > >> > > > > > > >>  >> >> 2014. You can see it here:
> > > > > > > > > > > >> > > https://github.com/resetius/ka
> > > > > > > > > > > >> > > > > > > >>  >> >> fka/commit/20658593e246d218490
> > > > > > > > > > > 6879defa2e763c4d413fb
> > > > > > > > > > > >> > > > > > > >>  >> >> The idea is very simple
> > > > > > > > > > > >> > > > > > > >>  >> >> 1. Disk balancer runs in a
> > > separate
> > > > > > thread
> > > > > > > > > > inside
> > > > > > > > > > > >> > > scheduler
> > > > > > > > > > > >> > > > > > pool.
> > > > > > > > > > > >> > > > > > > >>  >> >> 2. It does not touch empty
> > > > partitions
> > > > > > > > > > > >> > > > > > > >>  >> >> 3. Before it moves a partition
> > it
> > > > > > forcibly
> > > > > > > > > > creates
> > > > > > > > > > > >> new
> > > > > > > > > > > >> > > > > segment
> > > > > > > > > > > >> > > > > > > on a
> > > > > > > > > > > >> > > > > > > >>  >> >> destination disk
> > > > > > > > > > > >> > > > > > > >>  >> >> 4. It moves segment by segment
> > > from
> > > > > new
> > > > > > to
> > > > > > > > > old.
> > > > > > > > > > > >> > > > > > > >>  >> >> 5. Log class works with
> segments
> > > on
> > > > > both
> > > > > > > > disks
> > > > > > > > > > > >> > > > > > > >>  >> >>
> > > > > > > > > > > >> > > > > > > >>  >> >> Your approach seems too
> > > complicated,
> > > > > > > > moreover
> > > > > > > > > it
> > > > > > > > > > > >> means
> > > > > > > > > > > >> > > that
> > > > > > > > > > > >> > > > > you
> > > > > > > > > > > >> > > > > > > >>  have to
> > > > > > > > > > > >> > > > > > > >>  >> >> patch different components of
> > the
> > > > > system
> > > > > > > > > > > >> > > > > > > >>  >> >> Could you clarify what do you
> > mean
> > > > by
> > > > > > > > > > > >> > topicPartition.log?
> > > > > > > > > > > >> > > > Is
> > > > > > > > > > > >> > > > > it
> > > > > > > > > > > >> > > > > > > >>  >> >> topic-paritition/segment.log ?
> > > > > > > > > > > >> > > > > > > >>  >> >>
> > > > > > > > > > > >> > > > > > > >>  >> >> 12.01.2017, 21:47, "Dong Lin"
> <
> > > > > > > > > > > lindon...@gmail.com
> > > > > > > > > > > >> >:
> > > > > > > > > > > >> > > > > > > >>  >> >> > Hi all,
> > > > > > > > > > > >> > > > > > > >>  >> >> >
> > > > > > > > > > > >> > > > > > > >>  >> >> > We created KIP-113: Support
> > > > replicas
> > > > > > > > > movement
> > > > > > > > > > > >> between
> > > > > > > > > > > >> > > log
> > > > > > > > > > > >> > > > > > > >>  >> directories.
> > > > > > > > > > > >> > > > > > > >>  >> >> > Please find the KIP wiki in
> > the
> > > > link
> > > > > > > > > > > >> > > > > > > >>  >> >> > *
> > https://cwiki.apache.org/conf
> > > > > > > > > > > >> > > > > luence/display/KAFKA/KIP-113%
> > > > > > > > > > > >> > > > > > > >>  >> >> 3A+Support+replicas+movement+b
> > > > > > > > > > > >> etween+log+directories
> > > > > > > > > > > >> > > > > > > >>  >> >> > <
> > https://cwiki.apache.org/conf
> > > > > > > > > > > >> > > > > luence/display/KAFKA/KIP-113%
> > > > > > > > > > > >> > > > > > > >>  >> >> 3A+Support+replicas+movement+
> > > > > > > > > > > >> > between+log+directories>.*
> > > > > > > > > > > >> > > > > > > >>  >> >> >
> > > > > > > > > > > >> > > > > > > >>  >> >> > This KIP is related to
> KIP-112
> > > > > > > > > > > >> > > > > > > >>  >> >> > <
> > https://cwiki.apache.org/conf
> > > > > > > > > > > >> > > > > luence/display/KAFKA/KIP-112%
> > > > > > > > > > > >> > > > > > > >>  >> >> 3A+Handle+disk+failure+for+
> > JBOD>:
> > > > > > > > > > > >> > > > > > > >>  >> >> > Handle disk failure for
> JBOD.
> > > They
> > > > > are
> > > > > > > > > needed
> > > > > > > > > > in
> > > > > > > > > > > >> > order
> > > > > > > > > > > >> > > to
> > > > > > > > > > > >> > > > > > > support
> > > > > > > > > > > >> > > > > > > >>  >> JBOD in
> > > > > > > > > > > >> > > > > > > >>  >> >> > Kafka. Please help review
> the
> > > KIP.
> > > > > You
> > > > > > > > > > feedback
> > > > > > > > > > > is
> > > > > > > > > > > >> > > > > > appreciated!
> > > > > > > > > > > >> > > > > > > >>  >> >> >
> > > > > > > > > > > >> > > > > > > >>  >> >> > Thanks,
> > > > > > > > > > > >> > > > > > > >>  >> >> > Dong
> > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > >> > > > > >
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > >
> > > > > > > > > > > >> >
> > > > > > > > > > > >>
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to