Hey Jun,

Thanks much for the comment! Do you think we start vote for KIP-112 and
KIP-113 if there is no further concern?

Dong

On Thu, Mar 30, 2017 at 10:40 AM, Jun Rao <j...@confluent.io> wrote:

> Hi, Dong,
>
> Ok, so it seems that in solution (2), if the tool exits successfully, then
> we know for sure that all replicas will be in the right log dirs. Solution
> (1) doesn't guarantee that. That seems better and we can go with your
> current solution then.
>
> Thanks,
>
> Jun
>
> On Fri, Mar 24, 2017 at 4:28 PM, Dong Lin <lindon...@gmail.com> wrote:
>
> > Hey Jun,
> >
> > No.. the current approach describe in the KIP (see here
> > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-113%
> > 3A+Support+replicas+movement+between+log+directories#KIP-
> > 113:Supportreplicasmovementbetweenlogdirectories-2)Howtoreas
> > signreplicabetweenlogdirectoriesacrossbrokers>)
> > also sends ChangeReplicaDirRequest before writing reassignment path in
> ZK.
> > I think we discussing whether ChangeReplicaDirResponse (1) shows success
> or
> > (2) should specify ReplicaNotAvailableException, if replica has not been
> > created yet.
> >
> > Since both solution will send ChangeReplicaDirRequest before writing
> > reassignment in ZK, their chance of creating replica in the right
> directory
> > is the same.
> >
> > To take care of the rarer case that some brokers go down immediately
> after
> > the reassignment tool is run, solution (1) requires reassignment tool to
> > repeatedly send DescribeDirRequest and ChangeReplicaDirRequest, while
> > solution (1) requires tool to only retry ChangeReplicaDirRequest if the
> > response says ReplicaNotAvailableException. It seems that solution (2) is
> > cleaner because ChangeReplicaDirRequest won't depend on
> DescribeDirRequest.
> > What do you think?
> >
> > Thanks,
> > Dong
> >
> >
> > On Fri, Mar 24, 2017 at 3:56 PM, Jun Rao <j...@confluent.io> wrote:
> >
> > > Hi, Dong,
> > >
> > > We are just comparing whether it's better for the reassignment tool to
> > > send ChangeReplicaDirRequest
> > > (1) before or (2) after writing the reassignment path in ZK .
> > >
> > > In the case when all brokers are alive when the reassignment tool is
> run,
> > > (1) guarantees 100% that the new replicas will be in the right log dirs
> > and
> > > (2) can't.
> > >
> > > In the rarer case that some brokers go down immediately after the
> > > reassignment tool is run, in either approach, there is a chance when
> the
> > > failed broker comes back, it will complete the pending reassignment
> > process
> > > by putting some replicas in the wrong log dirs.
> > >
> > > Implementation wise, (1) and (2) seem to be the same. So, it seems to
> me
> > > that (1) is better?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Thu, Mar 23, 2017 at 11:54 PM, Dong Lin <lindon...@gmail.com>
> wrote:
> > >
> > > > Hey Jun,
> > > >
> > > > Thanks much for the response! I agree with you that if multiple
> > replicas
> > > > are created in the wrong directory, we may waste resource if either
> > > > replicaMoveThread number is low or intra.broker.throttled.rate is
> slow.
> > > > Then the question is whether the suggested approach increases the
> > chance
> > > of
> > > > replica being created in the correct log directory.
> > > >
> > > > I think the answer is no due to the argument provided in the previous
> > > > email. Sending ChangeReplicaDirRequest before updating znode has
> > > negligible
> > > > impact on the chance that the broker processes
> ChangeReplicaDirRequest
> > > > before LeaderAndIsrRequest from controller. If we still worry about
> the
> > > > order they are sent, the reassignment tool can first send
> > > > ChangeReplicaDirRequest (so that broker remembers it in memory),
> create
> > > > reassignment znode, and then retry ChangeReplicaDirRequset if the
> > > previous
> > > > ChangeReplicaDirResponse says the replica has not been created. This
> > > should
> > > > give us the highest possible chance of creating replica in the
> correct
> > > > directory and avoid the problem of the suggested approach. I have
> > updated
> > > > "How
> > > > to reassign replica between log directories across brokers" in the
> KIP
> > to
> > > > explain this procedure.
> > > >
> > > > To answer your question, the reassignment tool should fail with with
> > > proper
> > > > error message if user has specified log directory for a replica on an
> > > > offline broker.  This is reasonable because reassignment tool can not
> > > > guarantee that the replica will be moved to the specified log
> directory
> > > if
> > > > the broker is offline. If all brokers are online, the reassignment
> tool
> > > may
> > > > hung up to 10 seconds (by default) to retry ChangeReplicaDirRequest
> if
> > > any
> > > > replica has not been created already. User can change this timeout
> > value
> > > > using the newly-added --timeout argument of the reassignment tool.
> This
> > > is
> > > > specified in the Public Interface section in the KIP. The
> reassignment
> > > tool
> > > > will only block if user uses this new feature of reassigning replica
> > to a
> > > > specific log directory in the broker. Therefore it seems backward
> > > > compatible.
> > > >
> > > > Does this address the concern?
> > > >
> > > > Thanks,
> > > > Dong
> > > >
> > > > On Thu, Mar 23, 2017 at 10:06 PM, Jun Rao <j...@confluent.io> wrote:
> > > >
> > > > > Hi, Dong,
> > > > >
> > > > > 11.2 I think there are a few reasons why the cross disk movement
> may
> > > not
> > > > > catch up if the replicas are created in the wrong log dirs to start
> > > with.
> > > > > (a) There could be more replica fetcher threads than the disk
> > movement
> > > > > threads. (b) intra.broker.throttled.rate may be configured lower
> than
> > > the
> > > > > replica throttle rate. That's why I think getting the replicas
> > created
> > > in
> > > > > the right log dirs will be better.
> > > > >
> > > > > For the corner case issue that you mentioned, I am not sure if the
> > > > approach
> > > > > in the KIP completely avoids that. If a broker is down when the
> > > partition
> > > > > reassignment tool is started, does the tool just hang (keep
> retrying
> > > > > ChangeReplicaDirRequest) until the broker comes back? Currently,
> the
> > > > > partition reassignment tool doesn't block.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > >
> > > > > On Tue, Mar 21, 2017 at 11:24 AM, Dong Lin <lindon...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Hey Jun,
> > > > > >
> > > > > > Thanks for the explanation. Please see below my thoughts.
> > > > > >
> > > > > > 10. I see. So you are concerned with the potential implementation
> > > > > > complexity which I wasn't aware of. I think it is OK not to do
> log
> > > > > > cleaning on the .move log since there can be only one such log in
> > > each
> > > > > > directory. I have updated the KIP to specify this:
> > > > > >
> > > > > > "The log segments in topicPartition.move directory will be
> subject
> > to
> > > > log
> > > > > > truncation, log retention in the same way as the log segments in
> > the
> > > > > source
> > > > > > log directory. But we may not do log cleaning on the
> > > > topicPartition.move
> > > > > to
> > > > > > simplify the implementation."
> > > > > >
> > > > > > 11.2 Now I get your point. I think we have slightly different
> > > > expectation
> > > > > > of the order in which the reassignment tools updates reassignment
> > > node
> > > > in
> > > > > > ZK and sends ChangeReplicaDirRequest.
> > > > > >
> > > > > > I think the reassignment tool should first create reassignment
> > znode
> > > > and
> > > > > > then keep sending ChangeReplicaDirRequest until success. I think
> > > > sending
> > > > > > ChangeReplicaDirRequest before updating znode has negligible
> impact
> > > on
> > > > > the
> > > > > > chance that the broker processes ChangeReplicaDirRequest before
> > > > > > LeaderAndIsrRequest from controller, because the time for
> > controller
> > > to
> > > > > > receive ZK notification, handle state machine changes and send
> > > > > > LeaderAndIsrRequests should be much longer than the time for
> > > > reassignment
> > > > > > tool to setup connection with broker and send
> > > ChangeReplicaDirRequest.
> > > > > Even
> > > > > > if broker receives LeaderAndIsrRequest a bit sooner, the data in
> > the
> > > > > > original replica should be smaller enough for .move log to catch
> up
> > > > very
> > > > > > quickly, so that broker can swap the log soon after it receives
> > > > > > ChangeReplicaDirRequest -- otherwise the
> > intra.broker.throttled.rate
> > > is
> > > > > > probably too small. Does this address your concern with the
> > > > performance?
> > > > > >
> > > > > > One concern with the suggested approach is that the
> > > > > ChangeReplicaDirRequest
> > > > > > may be lost if broker crashes before it creates the replica. I
> > agree
> > > it
> > > > > is
> > > > > > rare. But it will be confusing when it happens. Operators would
> > have
> > > to
> > > > > > keep verifying reassignment and possibly retry execution until
> > > success
> > > > if
> > > > > > they want to make sure that the ChangeReplicaDirRequest is
> > executed.
> > > > > >
> > > > > > Thanks,
> > > > > > Dong
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Tue, Mar 21, 2017 at 8:37 AM, Jun Rao <j...@confluent.io>
> wrote:
> > > > > >
> > > > > > > Hi, Dong,
> > > > > > >
> > > > > > > 10. I was mainly concerned about the additional complexity
> needed
> > > to
> > > > > > > support log cleaning in the .move log. For example, LogToClean
> is
> > > > keyed
> > > > > > off
> > > > > > > TopicPartition. To be able to support cleaning different
> > instances
> > > of
> > > > > the
> > > > > > > same partition, we need additional logic. I am not how much
> > > > additional
> > > > > > > complexity is needed and whether it's worth it. If we don't do
> > log
> > > > > > cleaning
> > > > > > > at all on the .move log, then we don't have to change the log
> > > > cleaner's
> > > > > > > code.
> > > > > > >
> > > > > > > 11.2 I was thinking of the following flow. In the execute
> phase,
> > > the
> > > > > > > reassignment tool first issues a ChangeReplicaDirRequest to
> > brokers
> > > > > where
> > > > > > > new replicas will be created. The brokers remember the mapping
> > and
> > > > > > return a
> > > > > > > successful code. The reassignment tool then initiates the cross
> > > > broker
> > > > > > > movement through the controller. In the verify phase, in
> addition
> > > to
> > > > > > > checking the replica assignment at the brokers, it issues
> > > > > > > DescribeDirsRequest to check the replica to log dirs mapping.
> For
> > > > each
> > > > > > > partition in the response, the broker returns a state to
> indicate
> > > > > whether
> > > > > > > the replica is final, temporary or pending. If all replicas are
> > in
> > > > the
> > > > > > > final state, the tool checks if all replicas are in the
> expected
> > > log
> > > > > > dirs.
> > > > > > > If they are not, output a warning (and perhaps suggest the
> users
> > to
> > > > > move
> > > > > > > the data again). However, this should be rare.
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > > >
> > > > > > > On Mon, Mar 20, 2017 at 10:46 AM, Dong Lin <
> lindon...@gmail.com>
> > > > > wrote:
> > > > > > >
> > > > > > > > Hey Jun,
> > > > > > > >
> > > > > > > > Thanks for the response! It seems that we have only two
> > remaining
> > > > > > issues.
> > > > > > > > Please see my reply below.
> > > > > > > >
> > > > > > > > On Mon, Mar 20, 2017 at 7:45 AM, Jun Rao <j...@confluent.io>
> > > wrote:
> > > > > > > >
> > > > > > > > > Hi, Dong,
> > > > > > > > >
> > > > > > > > > Thanks for the update. A few replies inlined below.
> > > > > > > > >
> > > > > > > > > On Thu, Mar 16, 2017 at 12:28 AM, Dong Lin <
> > > lindon...@gmail.com>
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hey Jun,
> > > > > > > > > >
> > > > > > > > > > Thanks for your comment! Please see my reply below.
> > > > > > > > > >
> > > > > > > > > > On Wed, Mar 15, 2017 at 9:45 PM, Jun Rao <
> j...@confluent.io
> > >
> > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi, Dong,
> > > > > > > > > > >
> > > > > > > > > > > Thanks for the reply.
> > > > > > > > > > >
> > > > > > > > > > > 10. Could you comment on that?
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Sorry, I missed that comment.
> > > > > > > > > >
> > > > > > > > > > Good point. I think the log segments in
> topicPartition.move
> > > > > > directory
> > > > > > > > > will
> > > > > > > > > > be subject to log truncation, log retention and log
> > cleaning
> > > in
> > > > > the
> > > > > > > > same
> > > > > > > > > > way as the log segments in the source log directory. I
> just
> > > > > > specified
> > > > > > > > > this
> > > > > > > > > > inthe KIP.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > This is ok, but doubles the overhead of log cleaning. We
> > > probably
> > > > > > want
> > > > > > > to
> > > > > > > > > think a bit more on this.
> > > > > > > > >
> > > > > > > >
> > > > > > > > I think this is OK because the number of replicas that are
> > being
> > > > > moved
> > > > > > is
> > > > > > > > limited by the number of ReplicaMoveThread. The default
> number
> > of
> > > > > > > > ReplicaMoveThread is the number of log directories, which
> mean
> > we
> > > > > incur
> > > > > > > > these overhead for at most one replica per log directory at
> any
> > > > time.
> > > > > > > > Suppose there are most than 100 replica in any log directory,
> > the
> > > > > > > increase
> > > > > > > > in overhead is less than 1%.
> > > > > > > >
> > > > > > > > Another way to look at this is that this is no worse than
> > replica
> > > > > > > > reassignment. When we reassign replica from one broker to
> > > another,
> > > > we
> > > > > > > will
> > > > > > > > double the overhread of log cleaning in the cluster for this
> > > > replica.
> > > > > > If
> > > > > > > we
> > > > > > > > are OK with this then we are OK with replica movement between
> > log
> > > > > > > > directories.
> > > > > > > >
> > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > 11.2 "I am concerned that the ChangeReplicaDirRequest
> > would
> > > > be
> > > > > > lost
> > > > > > > > if
> > > > > > > > > > > broker
> > > > > > > > > > > restarts after it sends ChangeReplicaDirResponse but
> > before
> > > > it
> > > > > > > > receives
> > > > > > > > > > > LeaderAndIsrRequest."
> > > > > > > > > > >
> > > > > > > > > > > In that case, the reassignment tool could detect that
> > > through
> > > > > > > > > > > DescribeDirsRequest
> > > > > > > > > > > and issue ChangeReplicaDirRequest again, right? In the
> > > common
> > > > > > case,
> > > > > > > > > this
> > > > > > > > > > is
> > > > > > > > > > > probably not needed and we only need to write each
> > replica
> > > > > once.
> > > > > > > > > > >
> > > > > > > > > > > My main concern with the approach in the current KIP is
> > > that
> > > > > > once a
> > > > > > > > new
> > > > > > > > > > > replica is created in the wrong log dir, the cross log
> > > > > directory
> > > > > > > > > movement
> > > > > > > > > > > may not catch up until the new replica is fully
> > > bootstrapped.
> > > > > So,
> > > > > > > we
> > > > > > > > > end
> > > > > > > > > > up
> > > > > > > > > > > writing the data for the same replica twice.
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > I agree with your concern. My main concern is that it is
> a
> > > bit
> > > > > > weird
> > > > > > > if
> > > > > > > > > > ChangeReplicaDirResponse can not guarantee success and
> the
> > > tool
> > > > > > needs
> > > > > > > > to
> > > > > > > > > > rely on DescribeDirResponse to see if it needs to send
> > > > > > > > > > ChangeReplicaDirRequest again.
> > > > > > > > > >
> > > > > > > > > > How about this: If broker doesn't not have already
> replica
> > > > > created
> > > > > > > for
> > > > > > > > > the
> > > > > > > > > > specified topicParition when it receives
> > > > ChangeReplicaDirRequest,
> > > > > > it
> > > > > > > > will
> > > > > > > > > > reply ReplicaNotAvailableException AND remember (replica,
> > > > > > destination
> > > > > > > > log
> > > > > > > > > > directory) pair in memory to create the replica in the
> > > > specified
> > > > > > log
> > > > > > > > > > directory.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > I am not sure if returning ReplicaNotAvailableException is
> > > > useful?
> > > > > > What
> > > > > > > > > will the client do on receiving
> ReplicaNotAvailableException
> > in
> > > > > this
> > > > > > > > case?
> > > > > > > > >
> > > > > > > > > Perhaps we could just replace the is_temporary field in
> > > > > > > > > DescribeDirsRresponsePartition with a state field. We can
> > use 0
> > > > to
> > > > > > > > indicate
> > > > > > > > > the partition is created, 1 to indicate the partition is
> > > > temporary
> > > > > > and
> > > > > > > 2
> > > > > > > > to
> > > > > > > > > indicate that the partition is pending.
> > > > > > > > >
> > > > > > > >
> > > > > > > > ReplicaNotAvailableException is useful because the client can
> > > > re-send
> > > > > > > > ChangeReplicaDirRequest (with backoff) after receiving
> > > > > > > > ReplicaNotAvailableException in the response.
> > > > ChangeReplicaDirRequest
> > > > > > > will
> > > > > > > > only succeed after replica has been created for the specified
> > > > > partition
> > > > > > > in
> > > > > > > > the broker.
> > > > > > > >
> > > > > > > > I think this is cleaner than asking reassignment tool to
> detect
> > > > that
> > > > > > > > through DescribeDirsRequest and issue ChangeReplicaDirRequest
> > > > again.
> > > > > > Both
> > > > > > > > solution has the same chance of writing the data for the same
> > > > replica
> > > > > > > > twice. In the original solution, the reassignment tool will
> > keep
> > > > > > retrying
> > > > > > > > ChangeReplicaDirRequest until success. In the second
> suggested
> > > > > > solution,
> > > > > > > > the reassignment tool needs to send ChangeReplicaDirRequest,
> > send
> > > > > > > > DescribeDirsRequest to verify result, and retry
> > > > > ChangeReplicaDirRequest
> > > > > > > and
> > > > > > > > DescribeDirsRequest again if the replica hasn't been created
> > > > already.
> > > > > > > Thus
> > > > > > > > the second solution couples ChangeReplicaDirRequest with
> > > > > > > > DescribeDirsRequest and makes tool's logic is bit more
> > > complicated.
> > > > > > > >
> > > > > > > > Besides, I am not sure I understand your suggestion for
> > > > is_temporary
> > > > > > > field.
> > > > > > > > It seems that a replica can have only two states, i.e. normal
> > it
> > > is
> > > > > > being
> > > > > > > > used to serve fetch/produce requests and temporary if it is a
> > > > replica
> > > > > > is
> > > > > > > > that catching up with the normal one. If you think we should
> > have
> > > > > > > > reassignment tool send DescribeDirsRequest before retrying
> > > > > > > > ChangeReplicaDirRequest, can you elaborate a bit what is the
> > > > > "pending"
> > > > > > > > state?
> > > > > > > >
> > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > 11.3 Are you saying the value in --throttle will be
> used
> > to
> > > > set
> > > > > > > both
> > > > > > > > > > > intra.broker.throttled.rate and
> > > leader.follower.replication.
> > > > > > > > > > > throttled.replicas?
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > No. --throttle will be used to only to set
> > > > > > > leader.follower.replication
> > > > > > > > as
> > > > > > > > > > it does now. I think we do not need any option in the
> > > > > > > > > > kafka-reassignment-partitions.sh to specify
> > > > > > > > intra.broker.throttled.rate.
> > > > > > > > > > User canset it in broker config or dynamically using
> > > > > > kafka-config.sh.
> > > > > > > > > Does
> > > > > > > > > > this sound OK?
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > Ok. This sounds good. It would be useful to make this clear
> > in
> > > > the
> > > > > > > wiki.
> > > > > > > > >
> > > > > > > > > Sure. I have updated the wiki to specify this: "the quota
> > > > specified
> > > > > > by
> > > > > > > > the
> > > > > > > > argument `–throttle` will be applied to only inter-broker
> > replica
> > > > > > > > reassignment. It does not affect the quota for replica
> movement
> > > > > between
> > > > > > > log
> > > > > > > > directories".
> > > > > > > >
> > > > > > > >
> > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > 12.2 If the user only wants to check one topic, the
> tool
> > > > could
> > > > > do
> > > > > > > the
> > > > > > > > > > > filtering on the client side, right? My concern with
> > having
> > > > > both
> > > > > > > > > log_dirs
> > > > > > > > > > > and topics is the semantic. For example, if both are
> not
> > > > empty,
> > > > > > do
> > > > > > > we
> > > > > > > > > > > return the intersection or the union?
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Yes the tool could filter on the client side. But the
> > purpose
> > > > of
> > > > > > > having
> > > > > > > > > > this field is to reduce response side in case broker has
> a
> > > lot
> > > > of
> > > > > > > > topics.
> > > > > > > > > > The both fields are used as filter and the result is
> > > > > intersection.
> > > > > > Do
> > > > > > > > you
> > > > > > > > > > think this semantic is confusing or counter-intuitive?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > > Ok. Could we document the semantic when both dirs and
> topics
> > > are
> > > > > > > > specified?
> > > > > > > > >
> > > > > > > >
> > > > > > > > Sure. I have updated the wiki to specify this: "log_dirs and
> > > topics
> > > > > are
> > > > > > > > used to filter the results to include only the specified
> > > > > log_dir/topic.
> > > > > > > The
> > > > > > > > result is the intersection of both filters".
> > > > > > > >
> > > > > > > >
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > >
> > > > > > > > > Jun
> > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Mon, Mar 13, 2017 at 3:32 PM, Dong Lin <
> > > > lindon...@gmail.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hey Jun,
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks much for your detailed comments. Please see my
> > > reply
> > > > > > > below.
> > > > > > > > > > > >
> > > > > > > > > > > > On Mon, Mar 13, 2017 at 9:09 AM, Jun Rao <
> > > j...@confluent.io
> > > > >
> > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi, Dong,
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks for the updated KIP. Some more comments
> below.
> > > > > > > > > > > > >
> > > > > > > > > > > > > 10. For the .move log, do we perform any segment
> > > deletion
> > > > > > > (based
> > > > > > > > on
> > > > > > > > > > > > > retention) or log cleaning (if a compacted topic)?
> Or
> > > do
> > > > we
> > > > > > > only
> > > > > > > > > > enable
> > > > > > > > > > > > > that after the swap?
> > > > > > > > > > > > >
> > > > > > > > > > > > > 11. kafka-reassign-partitions.sh
> > > > > > > > > > > > > 11.1 If all reassigned replicas are in the current
> > > broker
> > > > > and
> > > > > > > > only
> > > > > > > > > > the
> > > > > > > > > > > > log
> > > > > > > > > > > > > directories have changed, we can probably optimize
> > the
> > > > tool
> > > > > > to
> > > > > > > > not
> > > > > > > > > > > > trigger
> > > > > > > > > > > > > partition reassignment through the controller and
> > only
> > > > > > > > > > > > > send ChangeReplicaDirRequest.
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > Yes, the reassignment script should not create the
> > > > > reassignment
> > > > > > > > znode
> > > > > > > > > > if
> > > > > > > > > > > no
> > > > > > > > > > > > replicas are not be moved between brokers. This falls
> > > into
> > > > > the
> > > > > > > "How
> > > > > > > > > to
> > > > > > > > > > > move
> > > > > > > > > > > > replica between log directories on the same broker"
> of
> > > the
> > > > > > > Proposed
> > > > > > > > > > > Change
> > > > > > > > > > > > section.
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > > 11.2 If ChangeReplicaDirRequest specifies a replica
> > > > that's
> > > > > > not
> > > > > > > > > > created
> > > > > > > > > > > > yet,
> > > > > > > > > > > > > could the broker just remember that in memory and
> > > create
> > > > > the
> > > > > > > > > replica
> > > > > > > > > > > when
> > > > > > > > > > > > > the creation is requested? This way, when doing
> > cluster
> > > > > > > > expansion,
> > > > > > > > > we
> > > > > > > > > > > can
> > > > > > > > > > > > > make sure that the new replicas on the new brokers
> > are
> > > > > > created
> > > > > > > in
> > > > > > > > > the
> > > > > > > > > > > > right
> > > > > > > > > > > > > log directory in the first place. We can also avoid
> > the
> > > > > tool
> > > > > > > > having
> > > > > > > > > > to
> > > > > > > > > > > > keep
> > > > > > > > > > > > > issuing ChangeReplicaDirRequest in response to
> > > > > > > > > > > > > ReplicaNotAvailableException.
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > I am concerned that the ChangeReplicaDirRequest would
> > be
> > > > lost
> > > > > > if
> > > > > > > > > broker
> > > > > > > > > > > > restarts after it sends ChangeReplicaDirResponse but
> > > before
> > > > > it
> > > > > > > > > receives
> > > > > > > > > > > > LeaderAndIsrRequest. In this case, the user will
> > receive
> > > > > > success
> > > > > > > > when
> > > > > > > > > > > they
> > > > > > > > > > > > initiate replica reassignment, but replica
> reassignment
> > > > will
> > > > > > > never
> > > > > > > > > > > complete
> > > > > > > > > > > > when they verify the reassignment later. This would
> be
> > > > > > confusing
> > > > > > > to
> > > > > > > > > > user.
> > > > > > > > > > > >
> > > > > > > > > > > > There are three different approaches to this problem
> if
> > > > > broker
> > > > > > > has
> > > > > > > > > not
> > > > > > > > > > > > created replica yet after it receives
> > > > > ChangeReplicaDirResquest:
> > > > > > > > > > > >
> > > > > > > > > > > > 1) Broker immediately replies to user with
> > > > > > > > > ReplicaNotAvailableException
> > > > > > > > > > > and
> > > > > > > > > > > > user can decide to retry again later. The advantage
> of
> > > this
> > > > > > > > solution
> > > > > > > > > is
> > > > > > > > > > > > that the broker logic is very simple and the
> > reassignment
> > > > > > script
> > > > > > > > > logic
> > > > > > > > > > > also
> > > > > > > > > > > > seems straightforward. The disadvantage is that user
> > > script
> > > > > has
> > > > > > > to
> > > > > > > > > > retry.
> > > > > > > > > > > > But it seems fine - we can set interval between
> retries
> > > to
> > > > be
> > > > > > 0.5
> > > > > > > > sec
> > > > > > > > > > so
> > > > > > > > > > > > that broker want be bombarded by those requests. This
> > is
> > > > the
> > > > > > > > solution
> > > > > > > > > > > > chosen in the current KIP.
> > > > > > > > > > > >
> > > > > > > > > > > > 2) Broker can put ChangeReplicaDirRequest in a
> > purgatory
> > > > with
> > > > > > > > timeout
> > > > > > > > > > and
> > > > > > > > > > > > replies to user after the replica has been created. I
> > > > didn't
> > > > > > > choose
> > > > > > > > > > this
> > > > > > > > > > > in
> > > > > > > > > > > > the interest of keeping broker logic simpler.
> > > > > > > > > > > >
> > > > > > > > > > > > 3) Broker can remember that by making a mark in the
> > disk,
> > > > > e.g.
> > > > > > > > create
> > > > > > > > > > > > topicPartition.tomove directory in the destination
> log
> > > > > > directory.
> > > > > > > > > This
> > > > > > > > > > > mark
> > > > > > > > > > > > will be persisted across broker restart. This is the
> > > first
> > > > > > idea I
> > > > > > > > had
> > > > > > > > > > > but I
> > > > > > > > > > > > replaced it with solution 1) in the interest of
> keeping
> > > > > broker
> > > > > > > > > simple.
> > > > > > > > > > > >
> > > > > > > > > > > > It seems that solution 1) is the simplest one that
> > works.
> > > > > But I
> > > > > > > am
> > > > > > > > OK
> > > > > > > > > > to
> > > > > > > > > > > > switch to the other two solutions if we don't want
> the
> > > > retry
> > > > > > > logic.
> > > > > > > > > > What
> > > > > > > > > > > do
> > > > > > > > > > > > you think?
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > 11.3 Do we need an option in the tool to specify
> > > > > intra.broker.
> > > > > > > > > > > > > throttled.rate?
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > I don't find it useful to add this option to
> > > > > > > > > > > kafka-reassign-partitions.sh.
> > > > > > > > > > > > The reason we have the option "--throttle" in the
> > script
> > > to
> > > > > > > > throttle
> > > > > > > > > > > > replication rate is that we usually want higher quota
> > to
> > > > fix
> > > > > an
> > > > > > > > > offline
> > > > > > > > > > > > replica to get out of URP. But we are OK to have a
> > lower
> > > > > quota
> > > > > > if
> > > > > > > > we
> > > > > > > > > > are
> > > > > > > > > > > > moving replica only to balance the cluster. Thus it
> is
> > > > common
> > > > > > for
> > > > > > > > SRE
> > > > > > > > > > to
> > > > > > > > > > > > use different quota when using
> > > kafka-reassign-partitions.sh
> > > > > to
> > > > > > > move
> > > > > > > > > > > replica
> > > > > > > > > > > > between brokers.
> > > > > > > > > > > >
> > > > > > > > > > > > However, the only reason for moving replica between
> log
> > > > > > > directories
> > > > > > > > > of
> > > > > > > > > > > the
> > > > > > > > > > > > same broker is to balance cluster resource. Thus the
> > > option
> > > > > to
> > > > > > > > > > > > specify intra.broker.throttled.rate in the tool is
> not
> > > that
> > > > > > > > useful. I
> > > > > > > > > > am
> > > > > > > > > > > > inclined not to add this option to keep this tool's
> > usage
> > > > > > > simpler.
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > 12. DescribeDirsRequest
> > > > > > > > > > > > > 12.1 In other requests like CreateTopicRequest, we
> > > return
> > > > > an
> > > > > > > > empty
> > > > > > > > > > list
> > > > > > > > > > > > in
> > > > > > > > > > > > > the response for an empty input list. If the input
> > list
> > > > is
> > > > > > > null,
> > > > > > > > we
> > > > > > > > > > > > return
> > > > > > > > > > > > > everything. We should probably follow the same
> > > convention
> > > > > > here.
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks. I wasn't aware of this convention. I have
> > change
> > > > > > > > > > > > DescribeDirsRequest so that "null" indicates "all".
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > > 12.2 Do we need the topics field? Since the request
> > is
> > > > > about
> > > > > > > log
> > > > > > > > > > dirs,
> > > > > > > > > > > it
> > > > > > > > > > > > > makes sense to specify the log dirs. But it's weird
> > to
> > > > > > specify
> > > > > > > > > > topics.
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > The topics field is not necessary. But it is useful
> to
> > > > reduce
> > > > > > the
> > > > > > > > > > > response
> > > > > > > > > > > > size in case user are only interested in the status
> of
> > a
> > > > few
> > > > > > > > topics.
> > > > > > > > > > For
> > > > > > > > > > > > example, user may have initiated the reassignment of
> a
> > > > given
> > > > > > > > replica
> > > > > > > > > > from
> > > > > > > > > > > > one log directory to another log directory on the
> same
> > > > > broker,
> > > > > > > and
> > > > > > > > > the
> > > > > > > > > > > user
> > > > > > > > > > > > only wants to check the status of this given
> partition
> > by
> > > > > > looking
> > > > > > > > > > > > at DescribeDirsResponse. Thus this field is useful.
> > > > > > > > > > > >
> > > > > > > > > > > > I am not sure if it is weird to call this request
> > > > > > > > > DescribeDirsRequest.
> > > > > > > > > > > The
> > > > > > > > > > > > response is a map from log directory to information
> to
> > > some
> > > > > > > > > partitions
> > > > > > > > > > on
> > > > > > > > > > > > the log directory. Do you think we need to change the
> > > name
> > > > of
> > > > > > the
> > > > > > > > > > > request?
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > > 12.3 DescribeDirsResponsePartition: Should we
> include
> > > > > > > firstOffset
> > > > > > > > > and
> > > > > > > > > > > > > nextOffset in the response? That could be useful to
> > > track
> > > > > the
> > > > > > > > > > progress
> > > > > > > > > > > of
> > > > > > > > > > > > > the movement.
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > Yeah good point. I agree it is useful to include
> > > > logEndOffset
> > > > > > in
> > > > > > > > the
> > > > > > > > > > > > response. According to Log.scala doc the logEndOffset
> > is
> > > > > > > equivalent
> > > > > > > > > to
> > > > > > > > > > > the
> > > > > > > > > > > > nextOffset. User can track progress by checking the
> > > > > difference
> > > > > > > > > between
> > > > > > > > > > > > logEndOffset of the given partition in the source and
> > > > > > destination
> > > > > > > > log
> > > > > > > > > > > > directories. I have added logEndOffset to the
> > > > > > > > > > > DescribeDirsResponsePartition
> > > > > > > > > > > > in the KIP.
> > > > > > > > > > > >
> > > > > > > > > > > > But it seems that we don't need firstOffset in the
> > > > response.
> > > > > Do
> > > > > > > you
> > > > > > > > > > think
> > > > > > > > > > > > firstOffset is still needed?
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > 13. ChangeReplicaDirResponse: Do we need error code
> > at
> > > > both
> > > > > > > > levels?
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > My bad. It is not needed. I have removed request
> level
> > > > error
> > > > > > > code.
> > > > > > > > I
> > > > > > > > > > also
> > > > > > > > > > > > added ChangeReplicaDirRequestTopic and
> > > > > > > > ChangeReplicaDirResponseTopic
> > > > > > > > > to
> > > > > > > > > > > > reduce duplication of the "topic" string in the
> request
> > > and
> > > > > > > > response.
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > 14. num.replica.move.threads: Does it default to #
> > log
> > > > > dirs?
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > No. It doesn't. I expect default number to be set to
> a
> > > > > > > conservative
> > > > > > > > > > value
> > > > > > > > > > > > such as 3. It may be surprising to user if the number
> > of
> > > > > > threads
> > > > > > > > > > increase
> > > > > > > > > > > > just because they have assigned more log directories
> to
> > > > Kafka
> > > > > > > > broker.
> > > > > > > > > > > >
> > > > > > > > > > > > It seems that the number of replica move threads
> > doesn't
> > > > have
> > > > > > to
> > > > > > > > > depend
> > > > > > > > > > > on
> > > > > > > > > > > > the number of log directories. It is possible to have
> > one
> > > > > > thread
> > > > > > > > that
> > > > > > > > > > > moves
> > > > > > > > > > > > replicas across all log directories. On the other
> hand
> > we
> > > > can
> > > > > > > have
> > > > > > > > > > > multiple
> > > > > > > > > > > > threads to move replicas to the same log directory.
> For
> > > > > > example,
> > > > > > > if
> > > > > > > > > > > broker
> > > > > > > > > > > > uses SSD, the CPU instead of disk IO may be the
> replica
> > > > move
> > > > > > > > > bottleneck
> > > > > > > > > > > and
> > > > > > > > > > > > it will be faster to move replicas using multiple
> > threads
> > > > per
> > > > > > log
> > > > > > > > > > > > directory.
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > >
> > > > > > > > > > > > > Jun
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Thu, Mar 9, 2017 at 7:04 PM, Dong Lin <
> > > > > > lindon...@gmail.com>
> > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > I just made one correction in the KIP. If broker
> > > > receives
> > > > > > > > > > > > > > ChangeReplicaDirRequest and the replica hasn't
> been
> > > > > created
> > > > > > > > > there,
> > > > > > > > > > > the
> > > > > > > > > > > > > > broker will respond ReplicaNotAvailableException.
> > > > > > > > > > > > > > The kafka-reassignemnt-partitions.sh will need
> to
> > > > > re-send
> > > > > > > > > > > > > > ChangeReplicaDirRequest in this case in order to
> > wait
> > > > for
> > > > > > > > > > controller
> > > > > > > > > > > to
> > > > > > > > > > > > > > send LeaderAndIsrRequest to broker. The previous
> > > > approach
> > > > > > of
> > > > > > > > > > creating
> > > > > > > > > > > > an
> > > > > > > > > > > > > > empty directory seems hacky.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Thu, Mar 9, 2017 at 6:33 PM, Dong Lin <
> > > > > > > lindon...@gmail.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Hey Jun,
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thanks for your comments! I have updated the
> KIP
> > to
> > > > > > address
> > > > > > > > > your
> > > > > > > > > > > > > > comments.
> > > > > > > > > > > > > > > Please see my reply inline.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Can you let me know if the latest KIP has
> > addressed
> > > > > your
> > > > > > > > > > comments?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Wed, Mar 8, 2017 at 9:56 PM, Jun Rao <
> > > > > > j...@confluent.io>
> > > > > > > > > > wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >> Hi, Dong,
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> Thanks for the reply.
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> 1.3 So the thread gets the lock, checks if
> > caught
> > > up
> > > > > and
> > > > > > > > > > releases
> > > > > > > > > > > > the
> > > > > > > > > > > > > > lock
> > > > > > > > > > > > > > >> if not? Then, in the case when there is
> > continuous
> > > > > > > incoming
> > > > > > > > > > data,
> > > > > > > > > > > > the
> > > > > > > > > > > > > > >> thread may never get a chance to swap. One way
> > to
> > > > > > address
> > > > > > > > this
> > > > > > > > > > is
> > > > > > > > > > > > when
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > >> thread is getting really close in catching up,
> > > just
> > > > > hold
> > > > > > > > onto
> > > > > > > > > > the
> > > > > > > > > > > > lock
> > > > > > > > > > > > > > >> until the thread fully catches up.
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Yes, that was my original solution. I see your
> > > point
> > > > > that
> > > > > > > the
> > > > > > > > > > lock
> > > > > > > > > > > > may
> > > > > > > > > > > > > > not
> > > > > > > > > > > > > > > be fairly assigned to ReplicaMoveThread and
> > > > > > > > > RequestHandlerThread
> > > > > > > > > > > when
> > > > > > > > > > > > > > there
> > > > > > > > > > > > > > > is frequent incoming requets. You solution
> should
> > > > > address
> > > > > > > the
> > > > > > > > > > > problem
> > > > > > > > > > > > > > and I
> > > > > > > > > > > > > > > have updated the KIP to use it.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> 2.3 So, you are saying that the partition
> > > > reassignment
> > > > > > > tool
> > > > > > > > > can
> > > > > > > > > > > > first
> > > > > > > > > > > > > > send
> > > > > > > > > > > > > > >> a ChangeReplicaDirRequest to relevant brokers
> to
> > > > > > establish
> > > > > > > > the
> > > > > > > > > > log
> > > > > > > > > > > > dir
> > > > > > > > > > > > > > for
> > > > > > > > > > > > > > >> replicas not created yet, then trigger the
> > > partition
> > > > > > > > movement
> > > > > > > > > > > across
> > > > > > > > > > > > > > >> brokers through the controller? That's
> actually
> > a
> > > > good
> > > > > > > idea.
> > > > > > > > > > Then,
> > > > > > > > > > > > we
> > > > > > > > > > > > > > can
> > > > > > > > > > > > > > >> just leave LeaderAndIsrRequest as it is.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Yes, that is what I plan to do. If broker
> > receives
> > > a
> > > > > > > > > > > > > > > ChangeReplicaDirRequest while it is not leader
> or
> > > > > > follower
> > > > > > > of
> > > > > > > > > the
> > > > > > > > > > > > > > > partition, the broker will create an empty Log
> > > > instance
> > > > > > > > (i.e. a
> > > > > > > > > > > > > directory
> > > > > > > > > > > > > > > named topicPartition) in the destination log
> > > > directory
> > > > > so
> > > > > > > > that
> > > > > > > > > > the
> > > > > > > > > > > > > > replica
> > > > > > > > > > > > > > > will be placed there when broker receives
> > > > > > > LeaderAndIsrRequest
> > > > > > > > > > from
> > > > > > > > > > > > the
> > > > > > > > > > > > > > > broker. The broker should clean up empty those
> > Log
> > > > > > > instances
> > > > > > > > on
> > > > > > > > > > > > startup
> > > > > > > > > > > > > > > just in case a ChangeReplicaDirRequest was
> > > mistakenly
> > > > > > sent
> > > > > > > > to a
> > > > > > > > > > > > broker
> > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > was not meant to be follower/leader of the
> > > > partition..
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >> Another thing related to
> > > > > > > > > > > > > > >> ChangeReplicaDirRequest.
> > > > > > > > > > > > > > >> Since this request may take long to complete,
> I
> > am
> > > > not
> > > > > > > sure
> > > > > > > > if
> > > > > > > > > > we
> > > > > > > > > > > > > should
> > > > > > > > > > > > > > >> wait for the movement to complete before
> > respond.
> > > > > While
> > > > > > > > > waiting
> > > > > > > > > > > for
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > >> movement to complete, the idle connection may
> be
> > > > > killed
> > > > > > or
> > > > > > > > the
> > > > > > > > > > > > client
> > > > > > > > > > > > > > may
> > > > > > > > > > > > > > >> be gone already. An alternative is to return
> > > > > immediately
> > > > > > > and
> > > > > > > > > > add a
> > > > > > > > > > > > new
> > > > > > > > > > > > > > >> request like CheckReplicaDirRequest to see if
> > the
> > > > > > movement
> > > > > > > > has
> > > > > > > > > > > > > > completed.
> > > > > > > > > > > > > > >> The tool can take advantage of that to check
> the
> > > > > status.
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > I agree with your concern and solution. We need
> > > > request
> > > > > > to
> > > > > > > > > query
> > > > > > > > > > > the
> > > > > > > > > > > > > > > partition -> log_directory mapping on the
> > broker. I
> > > > > have
> > > > > > > > > updated
> > > > > > > > > > > the
> > > > > > > > > > > > > KIP
> > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > remove need for ChangeReplicaDirRequestPurgato
> > ry.
> > > > > > > > > > > > > > > Instead, kafka-reassignemnt-partitions.sh will
> > > send
> > > > > > > > > > > > > DescribeDirsRequest
> > > > > > > > > > > > > > > to brokers when user wants to verify the
> > partition
> > > > > > > > assignment.
> > > > > > > > > > > Since
> > > > > > > > > > > > we
> > > > > > > > > > > > > > > need this DescribeDirsRequest anyway, we can
> also
> > > use
> > > > > > this
> > > > > > > > > > request
> > > > > > > > > > > to
> > > > > > > > > > > > > > > expose stats like the individual log size
> instead
> > > of
> > > > > > using
> > > > > > > > JMX.
> > > > > > > > > > One
> > > > > > > > > > > > > > > drawback of using JMX is that user has to
> manage
> > > the
> > > > > JMX
> > > > > > > port
> > > > > > > > > and
> > > > > > > > > > > > > related
> > > > > > > > > > > > > > > credentials if they haven't already done this,
> > > which
> > > > is
> > > > > > the
> > > > > > > > > case
> > > > > > > > > > at
> > > > > > > > > > > > > > > LinkedIn.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >> Thanks,
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> Jun
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> On Wed, Mar 8, 2017 at 6:21 PM, Dong Lin <
> > > > > > > > lindon...@gmail.com
> > > > > > > > > >
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> > Hey Jun,
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > Thanks for the detailed explanation. I will
> > use
> > > > the
> > > > > > > > separate
> > > > > > > > > > > > thread
> > > > > > > > > > > > > > >> pool to
> > > > > > > > > > > > > > >> > move replica between log directories. I will
> > let
> > > > you
> > > > > > > know
> > > > > > > > > when
> > > > > > > > > > > the
> > > > > > > > > > > > > KIP
> > > > > > > > > > > > > > >> has
> > > > > > > > > > > > > > >> > been updated to use a separate thread pool.
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > Here is my response to your other questions:
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > 1.3 My idea is that the ReplicaMoveThread
> that
> > > > moves
> > > > > > > data
> > > > > > > > > > should
> > > > > > > > > > > > get
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > >> > lock before checking whether the replica in
> > the
> > > > > > > > destination
> > > > > > > > > > log
> > > > > > > > > > > > > > >> directory
> > > > > > > > > > > > > > >> > has caught up. If the new replica has caught
> > up,
> > > > > then
> > > > > > > the
> > > > > > > > > > > > > > >> ReplicaMoveThread
> > > > > > > > > > > > > > >> > should swaps the replica while it is still
> > > holding
> > > > > the
> > > > > > > > lock.
> > > > > > > > > > The
> > > > > > > > > > > > > > >> > ReplicaFetcherThread or RequestHandlerThread
> > > will
> > > > > not
> > > > > > be
> > > > > > > > > able
> > > > > > > > > > to
> > > > > > > > > > > > > > append
> > > > > > > > > > > > > > >> > data to the replica in the source replica
> > during
> > > > > this
> > > > > > > > period
> > > > > > > > > > > > because
> > > > > > > > > > > > > > >> they
> > > > > > > > > > > > > > >> > can not get the lock. Does this address the
> > > > problem?
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > 2.3 I get your point that we want to keep
> > > > controller
> > > > > > > > > simpler.
> > > > > > > > > > If
> > > > > > > > > > > > > admin
> > > > > > > > > > > > > > >> tool
> > > > > > > > > > > > > > >> > can send ChangeReplicaDirRequest to move
> data
> > > > > within a
> > > > > > > > > broker,
> > > > > > > > > > > > then
> > > > > > > > > > > > > > >> > controller probably doesn't even need to
> > include
> > > > log
> > > > > > > > > directory
> > > > > > > > > > > > path
> > > > > > > > > > > > > in
> > > > > > > > > > > > > > >> the
> > > > > > > > > > > > > > >> > LeaderAndIsrRequest. How about this:
> > controller
> > > > will
> > > > > > > only
> > > > > > > > > deal
> > > > > > > > > > > > with
> > > > > > > > > > > > > > >> > reassignment across brokers as it does now.
> If
> > > > user
> > > > > > > > > specified
> > > > > > > > > > > > > > >> destination
> > > > > > > > > > > > > > >> > replica for any disk, the admin tool will
> send
> > > > > > > > > > > > > ChangeReplicaDirRequest
> > > > > > > > > > > > > > >> and
> > > > > > > > > > > > > > >> > wait for response from broker to confirm
> that
> > > all
> > > > > > > replicas
> > > > > > > > > > have
> > > > > > > > > > > > been
> > > > > > > > > > > > > > >> moved
> > > > > > > > > > > > > > >> > to the destination log direcotry. The broker
> > > will
> > > > > put
> > > > > > > > > > > > > > >> > ChangeReplicaDirRequset in a purgatory and
> > > respond
> > > > > > > either
> > > > > > > > > when
> > > > > > > > > > > the
> > > > > > > > > > > > > > >> movement
> > > > > > > > > > > > > > >> > is completed or when the request has
> > timed-out.
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > 4. I agree that we can expose these metrics
> > via
> > > > JMX.
> > > > > > > But I
> > > > > > > > > am
> > > > > > > > > > > not
> > > > > > > > > > > > > sure
> > > > > > > > > > > > > > >> if
> > > > > > > > > > > > > > >> > it can be obtained easily with good
> > performance
> > > > > using
> > > > > > > > either
> > > > > > > > > > > > > existing
> > > > > > > > > > > > > > >> tools
> > > > > > > > > > > > > > >> > or new script in kafka. I will ask SREs for
> > > their
> > > > > > > opinion.
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > Thanks,
> > > > > > > > > > > > > > >> > Dong
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > On Wed, Mar 8, 2017 at 1:24 PM, Jun Rao <
> > > > > > > j...@confluent.io
> > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > > Hi, Dong,
> > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > >> > > Thanks for the updated KIP. A few more
> > > comments
> > > > > > below.
> > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > >> > > 1.1 and 1.2: I am still not sure there is
> > > enough
> > > > > > > benefit
> > > > > > > > > of
> > > > > > > > > > > > > reusing
> > > > > > > > > > > > > > >> > > ReplicaFetchThread
> > > > > > > > > > > > > > >> > > to move data across disks.
> > > > > > > > > > > > > > >> > > (a) A big part of ReplicaFetchThread is to
> > > deal
> > > > > with
> > > > > > > > > issuing
> > > > > > > > > > > and
> > > > > > > > > > > > > > >> tracking
> > > > > > > > > > > > > > >> > > fetch requests. So, it doesn't feel that
> we
> > > get
> > > > > much
> > > > > > > > from
> > > > > > > > > > > > reusing
> > > > > > > > > > > > > > >> > > ReplicaFetchThread
> > > > > > > > > > > > > > >> > > only to disable the fetching part.
> > > > > > > > > > > > > > >> > > (b) The leader replica has no
> > > ReplicaFetchThread
> > > > > to
> > > > > > > > start
> > > > > > > > > > > with.
> > > > > > > > > > > > It
> > > > > > > > > > > > > > >> feels
> > > > > > > > > > > > > > >> > > weird to start one just for intra broker
> > data
> > > > > > > movement.
> > > > > > > > > > > > > > >> > > (c) The ReplicaFetchThread is per broker.
> > > > > > Intuitively,
> > > > > > > > the
> > > > > > > > > > > > number
> > > > > > > > > > > > > of
> > > > > > > > > > > > > > >> > > threads doing intra broker data movement
> > > should
> > > > be
> > > > > > > > related
> > > > > > > > > > to
> > > > > > > > > > > > the
> > > > > > > > > > > > > > >> number
> > > > > > > > > > > > > > >> > of
> > > > > > > > > > > > > > >> > > disks in the broker, not the number of
> > brokers
> > > > in
> > > > > > the
> > > > > > > > > > cluster.
> > > > > > > > > > > > > > >> > > (d) If the destination disk fails, we want
> > to
> > > > stop
> > > > > > the
> > > > > > > > > intra
> > > > > > > > > > > > > broker
> > > > > > > > > > > > > > >> data
> > > > > > > > > > > > > > >> > > movement, but want to continue inter
> broker
> > > > > > > replication.
> > > > > > > > > So,
> > > > > > > > > > > > > > >> logically,
> > > > > > > > > > > > > > >> > it
> > > > > > > > > > > > > > >> > > seems it's better to separate out the two.
> > > > > > > > > > > > > > >> > > (e) I am also not sure if we should reuse
> > the
> > > > > > existing
> > > > > > > > > > > > throttling
> > > > > > > > > > > > > > for
> > > > > > > > > > > > > > >> > > replication. It's designed to handle
> traffic
> > > > > across
> > > > > > > > > brokers
> > > > > > > > > > > and
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > >> > > delaying is done in the fetch request. So,
> > if
> > > we
> > > > > are
> > > > > > > not
> > > > > > > > > > doing
> > > > > > > > > > > > > > >> > > fetching in ReplicaFetchThread,
> > > > > > > > > > > > > > >> > > I am not sure the existing throttling is
> > > > > effective.
> > > > > > > > Also,
> > > > > > > > > > when
> > > > > > > > > > > > > > >> specifying
> > > > > > > > > > > > > > >> > > the throttling of moving data across
> disks,
> > it
> > > > > seems
> > > > > > > the
> > > > > > > > > > user
> > > > > > > > > > > > > > >> shouldn't
> > > > > > > > > > > > > > >> > > care about whether a replica is a leader
> or
> > a
> > > > > > > follower.
> > > > > > > > > > > Reusing
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > >> > > existing throttling config name will be
> > > awkward
> > > > in
> > > > > > > this
> > > > > > > > > > > regard.
> > > > > > > > > > > > > > >> > > (f) It seems it's simpler and more
> > consistent
> > > to
> > > > > > use a
> > > > > > > > > > > separate
> > > > > > > > > > > > > > thread
> > > > > > > > > > > > > > >> > pool
> > > > > > > > > > > > > > >> > > for local data movement (for both leader
> and
> > > > > > follower
> > > > > > > > > > > replicas).
> > > > > > > > > > > > > > This
> > > > > > > > > > > > > > >> > > process can then be configured (e.g.
> number
> > of
> > > > > > > threads,
> > > > > > > > > etc)
> > > > > > > > > > > and
> > > > > > > > > > > > > > >> > throttled
> > > > > > > > > > > > > > >> > > independently.
> > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > >> > > 1.3 Yes, we will need some synchronization
> > > > there.
> > > > > > So,
> > > > > > > if
> > > > > > > > > the
> > > > > > > > > > > > > > movement
> > > > > > > > > > > > > > >> > > thread catches up, gets the lock to do the
> > > swap,
> > > > > but
> > > > > > > > > > realizes
> > > > > > > > > > > > that
> > > > > > > > > > > > > > new
> > > > > > > > > > > > > > >> > data
> > > > > > > > > > > > > > >> > > is added, it has to continue catching up
> > while
> > > > > > holding
> > > > > > > > the
> > > > > > > > > > > lock?
> > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > >> > > 2.3 The benefit of including the desired
> log
> > > > > > directory
> > > > > > > > in
> > > > > > > > > > > > > > >> > > LeaderAndIsrRequest
> > > > > > > > > > > > > > >> > > during partition reassignment is that the
> > > > > controller
> > > > > > > > > doesn't
> > > > > > > > > > > > need
> > > > > > > > > > > > > to
> > > > > > > > > > > > > > >> > track
> > > > > > > > > > > > > > >> > > the progress for disk movement. So, you
> > don't
> > > > need
> > > > > > the
> > > > > > > > > > > > additional
> > > > > > > > > > > > > > >> > > BrokerDirStateUpdateRequest. Then the
> > > controller
> > > > > > never
> > > > > > > > > needs
> > > > > > > > > > > to
> > > > > > > > > > > > > > issue
> > > > > > > > > > > > > > >> > > ChangeReplicaDirRequest.
> > > > > > > > > > > > > > >> > > Only the admin tool will issue
> > > > > > ChangeReplicaDirRequest
> > > > > > > > to
> > > > > > > > > > move
> > > > > > > > > > > > > data
> > > > > > > > > > > > > > >> > within
> > > > > > > > > > > > > > >> > > a broker. I agree that this makes
> > > > > > LeaderAndIsrRequest
> > > > > > > > more
> > > > > > > > > > > > > > >> complicated,
> > > > > > > > > > > > > > >> > but
> > > > > > > > > > > > > > >> > > that seems simpler than changing the
> > > controller
> > > > to
> > > > > > > track
> > > > > > > > > > > > > additional
> > > > > > > > > > > > > > >> > states
> > > > > > > > > > > > > > >> > > during partition reassignment.
> > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > >> > > 4. We want to make a decision on how to
> > expose
> > > > the
> > > > > > > > stats.
> > > > > > > > > So
> > > > > > > > > > > > far,
> > > > > > > > > > > > > we
> > > > > > > > > > > > > > >> are
> > > > > > > > > > > > > > >> > > exposing stats like the individual log
> size
> > as
> > > > > JMX.
> > > > > > > So,
> > > > > > > > > one
> > > > > > > > > > > way
> > > > > > > > > > > > is
> > > > > > > > > > > > > > to
> > > > > > > > > > > > > > >> > just
> > > > > > > > > > > > > > >> > > add new jmx to expose the log directory of
> > > > > > individual
> > > > > > > > > > > replicas.
> > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > >> > > Thanks,
> > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > >> > > Jun
> > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > >> > > On Thu, Mar 2, 2017 at 11:18 PM, Dong Lin
> <
> > > > > > > > > > > lindon...@gmail.com>
> > > > > > > > > > > > > > >> wrote:
> > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > >> > > > Hey Jun,
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > > Thanks for all the comments! Please see
> my
> > > > > answer
> > > > > > > > > below. I
> > > > > > > > > > > > have
> > > > > > > > > > > > > > >> updated
> > > > > > > > > > > > > > >> > > the
> > > > > > > > > > > > > > >> > > > KIP to address most of the questions and
> > > make
> > > > > the
> > > > > > > KIP
> > > > > > > > > > easier
> > > > > > > > > > > > to
> > > > > > > > > > > > > > >> > > understand.
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > > Thanks,
> > > > > > > > > > > > > > >> > > > Dong
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > > On Thu, Mar 2, 2017 at 9:35 AM, Jun Rao
> <
> > > > > > > > > j...@confluent.io
> > > > > > > > > > >
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > > > Hi, Dong,
> > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > >> > > > > Thanks for the KIP. A few comments
> > below.
> > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > >> > > > > 1. For moving data across directories
> > > > > > > > > > > > > > >> > > > > 1.1 I am not sure why we want to use
> > > > > > > > > > ReplicaFetcherThread
> > > > > > > > > > > to
> > > > > > > > > > > > > > move
> > > > > > > > > > > > > > >> > data
> > > > > > > > > > > > > > >> > > > > around in the leader.
> ReplicaFetchThread
> > > > > fetches
> > > > > > > > data
> > > > > > > > > > from
> > > > > > > > > > > > > > socket.
> > > > > > > > > > > > > > >> > For
> > > > > > > > > > > > > > >> > > > > moving data locally, it seems that we
> > want
> > > > to
> > > > > > > avoid
> > > > > > > > > the
> > > > > > > > > > > > socket
> > > > > > > > > > > > > > >> > > overhead.
> > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > > The purpose of using ReplicaFetchThread
> is
> > > to
> > > > > > re-use
> > > > > > > > > > > existing
> > > > > > > > > > > > > > thread
> > > > > > > > > > > > > > >> > > > instead of creating more threads and
> make
> > > our
> > > > > > thread
> > > > > > > > > model
> > > > > > > > > > > > more
> > > > > > > > > > > > > > >> > complex.
> > > > > > > > > > > > > > >> > > It
> > > > > > > > > > > > > > >> > > > seems like a nature choice for copying
> > data
> > > > > > between
> > > > > > > > > disks
> > > > > > > > > > > > since
> > > > > > > > > > > > > it
> > > > > > > > > > > > > > >> is
> > > > > > > > > > > > > > >> > > > similar to copying data between brokers.
> > > > Another
> > > > > > > > reason
> > > > > > > > > is
> > > > > > > > > > > > that
> > > > > > > > > > > > > if
> > > > > > > > > > > > > > >> the
> > > > > > > > > > > > > > >> > > > replica to be moved is a follower, we
> > don't
> > > > need
> > > > > > > lock
> > > > > > > > to
> > > > > > > > > > > swap
> > > > > > > > > > > > > > >> replicas
> > > > > > > > > > > > > > >> > > when
> > > > > > > > > > > > > > >> > > > destination replica has caught up, since
> > the
> > > > > same
> > > > > > > > thread
> > > > > > > > > > > which
> > > > > > > > > > > > > is
> > > > > > > > > > > > > > >> > > fetching
> > > > > > > > > > > > > > >> > > > data from leader will swap the replica.
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > > The ReplicaFetchThread will not incur
> > socket
> > > > > > > overhead
> > > > > > > > > > while
> > > > > > > > > > > > > > copying
> > > > > > > > > > > > > > >> > data
> > > > > > > > > > > > > > >> > > > between disks. It will read directly
> from
> > > > source
> > > > > > > disk
> > > > > > > > > (as
> > > > > > > > > > we
> > > > > > > > > > > > do
> > > > > > > > > > > > > > when
> > > > > > > > > > > > > > >> > > > processing FetchRequest) and write to
> > > > > destination
> > > > > > > disk
> > > > > > > > > (as
> > > > > > > > > > > we
> > > > > > > > > > > > do
> > > > > > > > > > > > > > >> when
> > > > > > > > > > > > > > >> > > > processing ProduceRequest).
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > > > 1.2 I am also not sure about moving
> data
> > > in
> > > > > the
> > > > > > > > > > > > > > >> ReplicaFetcherThread
> > > > > > > > > > > > > > >> > in
> > > > > > > > > > > > > > >> > > > the
> > > > > > > > > > > > > > >> > > > > follower. For example, I am not sure
> > > setting
> > > > > > > > > > > > > > >> replica.fetch.max.wait
> > > > > > > > > > > > > > >> > to
> > > > > > > > > > > > > > >> > > 0
> > > > > > > > > > > > > > >> > > > >  is ideal. It may not always be
> > effective
> > > > > since
> > > > > > a
> > > > > > > > > fetch
> > > > > > > > > > > > > request
> > > > > > > > > > > > > > in
> > > > > > > > > > > > > > >> > the
> > > > > > > > > > > > > > >> > > > > ReplicaFetcherThread could be
> > arbitrarily
> > > > > > delayed
> > > > > > > > due
> > > > > > > > > to
> > > > > > > > > > > > > > >> replication
> > > > > > > > > > > > > > >> > > > > throttling on the leader. In general,
> > the
> > > > data
> > > > > > > > > movement
> > > > > > > > > > > > logic
> > > > > > > > > > > > > > >> across
> > > > > > > > > > > > > > >> > > > disks
> > > > > > > > > > > > > > >> > > > > seems different from that in
> > > > > > ReplicaFetcherThread.
> > > > > > > > > So, I
> > > > > > > > > > > am
> > > > > > > > > > > > > not
> > > > > > > > > > > > > > >> sure
> > > > > > > > > > > > > > >> > > why
> > > > > > > > > > > > > > >> > > > > they need to be coupled.
> > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > > While it may not be the most efficient
> way
> > > to
> > > > > copy
> > > > > > > > data
> > > > > > > > > > > > between
> > > > > > > > > > > > > > >> local
> > > > > > > > > > > > > > >> > > > disks, it will be at least as efficient
> as
> > > > > copying
> > > > > > > > data
> > > > > > > > > > from
> > > > > > > > > > > > > > leader
> > > > > > > > > > > > > > >> to
> > > > > > > > > > > > > > >> > > the
> > > > > > > > > > > > > > >> > > > destination disk. The expected goal of
> > > KIP-113
> > > > > is
> > > > > > to
> > > > > > > > > > enable
> > > > > > > > > > > > data
> > > > > > > > > > > > > > >> > movement
> > > > > > > > > > > > > > >> > > > between disks with no less efficiency
> than
> > > > what
> > > > > we
> > > > > > > do
> > > > > > > > > now
> > > > > > > > > > > when
> > > > > > > > > > > > > > >> moving
> > > > > > > > > > > > > > >> > > data
> > > > > > > > > > > > > > >> > > > between brokers. I think we can optimize
> > its
> > > > > > > > performance
> > > > > > > > > > > using
> > > > > > > > > > > > > > >> separate
> > > > > > > > > > > > > > >> > > > thread if the performance is not good
> > > enough.
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > > > 1.3 Could you add a bit more details
> on
> > > how
> > > > we
> > > > > > > swap
> > > > > > > > > the
> > > > > > > > > > > > > replicas
> > > > > > > > > > > > > > >> when
> > > > > > > > > > > > > > >> > > the
> > > > > > > > > > > > > > >> > > > > new ones are fully caught up? For
> > example,
> > > > > what
> > > > > > > > > happens
> > > > > > > > > > > when
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > >> new
> > > > > > > > > > > > > > >> > > > > replica in the new log directory is
> > caught
> > > > up,
> > > > > > but
> > > > > > > > > when
> > > > > > > > > > we
> > > > > > > > > > > > > want
> > > > > > > > > > > > > > >> to do
> > > > > > > > > > > > > > >> > > the
> > > > > > > > > > > > > > >> > > > > swap, some new data has arrived?
> > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > > If the replica is a leader, then
> > > > > > > ReplicaFetcherThread
> > > > > > > > > will
> > > > > > > > > > > > > perform
> > > > > > > > > > > > > > >> the
> > > > > > > > > > > > > > >> > > > replacement. Proper lock is needed to
> > > prevent
> > > > > > > > > > > > > KafkaRequestHandler
> > > > > > > > > > > > > > >> from
> > > > > > > > > > > > > > >> > > > appending data to the topicPartition.log
> > on
> > > > the
> > > > > > > source
> > > > > > > > > > disks
> > > > > > > > > > > > > > before
> > > > > > > > > > > > > > >> > this
> > > > > > > > > > > > > > >> > > > replacement is completed by
> > > > > ReplicaFetcherThread.
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > > If the replica is a follower, because
> the
> > > same
> > > > > > > > > > > > > ReplicaFetchThread
> > > > > > > > > > > > > > >> which
> > > > > > > > > > > > > > >> > > > fetches data from leader will also swap
> > the
> > > > > > replica
> > > > > > > ,
> > > > > > > > no
> > > > > > > > > > > lock
> > > > > > > > > > > > is
> > > > > > > > > > > > > > >> > needed.
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > > I have updated the KIP to specify both
> > more
> > > > > > > > explicitly.
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > > > 1.4 Do we need to do the .move at the
> > log
> > > > > > segment
> > > > > > > > > level
> > > > > > > > > > or
> > > > > > > > > > > > > could
> > > > > > > > > > > > > > >> we
> > > > > > > > > > > > > > >> > > just
> > > > > > > > > > > > > > >> > > > do
> > > > > > > > > > > > > > >> > > > > that at the replica directory level?
> > > > Renaming
> > > > > > > just a
> > > > > > > > > > > > directory
> > > > > > > > > > > > > > is
> > > > > > > > > > > > > > >> > much
> > > > > > > > > > > > > > >> > > > > faster than renaming the log segments.
> > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > > Great point. I have updated the KIP to
> > > rename
> > > > > the
> > > > > > > log
> > > > > > > > > > > > directory
> > > > > > > > > > > > > > >> > instead.
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > > > 1.5 Could you also describe a bit what
> > > > happens
> > > > > > > when
> > > > > > > > > > either
> > > > > > > > > > > > the
> > > > > > > > > > > > > > >> source
> > > > > > > > > > > > > > >> > > or
> > > > > > > > > > > > > > >> > > > > the target log directory fails while
> the
> > > > data
> > > > > > > moving
> > > > > > > > > is
> > > > > > > > > > in
> > > > > > > > > > > > > > >> progress?
> > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > > If source log directory fails, then the
> > > > replica
> > > > > > > > movement
> > > > > > > > > > > will
> > > > > > > > > > > > > stop
> > > > > > > > > > > > > > >> and
> > > > > > > > > > > > > > >> > > the
> > > > > > > > > > > > > > >> > > > source replica is marked offline. If
> > > > destination
> > > > > > log
> > > > > > > > > > > directory
> > > > > > > > > > > > > > >> fails,
> > > > > > > > > > > > > > >> > > then
> > > > > > > > > > > > > > >> > > > the replica movement will stop. I have
> > > updated
> > > > > the
> > > > > > > KIP
> > > > > > > > > to
> > > > > > > > > > > > > clarify
> > > > > > > > > > > > > > >> this.
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > >> > > > > 2. For partition reassignment.
> > > > > > > > > > > > > > >> > > > > 2.1 I am not sure if the controller
> can
> > > > block
> > > > > on
> > > > > > > > > > > > > > >> > > ChangeReplicaDirRequest.
> > > > > > > > > > > > > > >> > > > > Data movement may take a long time to
> > > > > complete.
> > > > > > If
> > > > > > > > > there
> > > > > > > > > > > is
> > > > > > > > > > > > an
> > > > > > > > > > > > > > >> > > > outstanding
> > > > > > > > > > > > > > >> > > > > request from the controller to a
> broker,
> > > > that
> > > > > > > broker
> > > > > > > > > > won't
> > > > > > > > > > > > be
> > > > > > > > > > > > > > >> able to
> > > > > > > > > > > > > > >> > > > > process any new request from the
> > > controller.
> > > > > So
> > > > > > if
> > > > > > > > > > another
> > > > > > > > > > > > > event
> > > > > > > > > > > > > > >> > (e.g.
> > > > > > > > > > > > > > >> > > > > broker failure) happens when the data
> > > > movement
> > > > > > is
> > > > > > > in
> > > > > > > > > > > > progress,
> > > > > > > > > > > > > > >> > > subsequent
> > > > > > > > > > > > > > >> > > > > LeaderAnIsrRequest will be delayed.
> > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > > Yeah good point. I missed the fact that
> > > there
> > > > is
> > > > > > be
> > > > > > > > only
> > > > > > > > > > one
> > > > > > > > > > > > > > >> inflight
> > > > > > > > > > > > > > >> > > > request from controller to broker.
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > > How about I add a request, e.g.
> > > > > > > > > > BrokerDirStateUpdateRequest,
> > > > > > > > > > > > > which
> > > > > > > > > > > > > > >> maps
> > > > > > > > > > > > > > >> > > > topicPartition to log directory and can
> be
> > > > sent
> > > > > > from
> > > > > > > > > > broker
> > > > > > > > > > > to
> > > > > > > > > > > > > > >> > controller
> > > > > > > > > > > > > > >> > > > to indicate completion?
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > > > 2.2 in the KIP, the partition
> > reassignment
> > > > > tool
> > > > > > is
> > > > > > > > > also
> > > > > > > > > > > used
> > > > > > > > > > > > > for
> > > > > > > > > > > > > > >> > cases
> > > > > > > > > > > > > > >> > > > > where an admin just wants to balance
> the
> > > > > > existing
> > > > > > > > data
> > > > > > > > > > > > across
> > > > > > > > > > > > > > log
> > > > > > > > > > > > > > >> > > > > directories in the broker. In this
> case,
> > > it
> > > > > > seems
> > > > > > > > that
> > > > > > > > > > > it's
> > > > > > > > > > > > > over
> > > > > > > > > > > > > > >> > > killing
> > > > > > > > > > > > > > >> > > > to
> > > > > > > > > > > > > > >> > > > > have the process go through the
> > > controller.
> > > > A
> > > > > > > > simpler
> > > > > > > > > > > > approach
> > > > > > > > > > > > > > is
> > > > > > > > > > > > > > >> to
> > > > > > > > > > > > > > >> > > > issue
> > > > > > > > > > > > > > >> > > > > an RPC request to the broker directly.
> > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > > I agree we can optimize this case. It is
> > > just
> > > > > that
> > > > > > > we
> > > > > > > > > have
> > > > > > > > > > > to
> > > > > > > > > > > > > add
> > > > > > > > > > > > > > >> new
> > > > > > > > > > > > > > >> > > logic
> > > > > > > > > > > > > > >> > > > or code path to handle a scenario that
> is
> > > > > already
> > > > > > > > > covered
> > > > > > > > > > by
> > > > > > > > > > > > the
> > > > > > > > > > > > > > >> more
> > > > > > > > > > > > > > >> > > > complicated scenario. I will add it to
> the
> > > > KIP.
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > > > 2.3 When using the partition
> > reassignment
> > > > tool
> > > > > > to
> > > > > > > > move
> > > > > > > > > > > > > replicas
> > > > > > > > > > > > > > >> > across
> > > > > > > > > > > > > > >> > > > > brokers, it make sense to be able to
> > > specify
> > > > > the
> > > > > > > log
> > > > > > > > > > > > directory
> > > > > > > > > > > > > > of
> > > > > > > > > > > > > > >> the
> > > > > > > > > > > > > > >> > > > newly
> > > > > > > > > > > > > > >> > > > > created replicas. The KIP does that in
> > two
> > > > > > > separate
> > > > > > > > > > > requests
> > > > > > > > > > > > > > >> > > > > ChangeReplicaDirRequest and
> > > > > LeaderAndIsrRequest,
> > > > > > > and
> > > > > > > > > > > tracks
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > >> > > progress
> > > > > > > > > > > > > > >> > > > of
> > > > > > > > > > > > > > >> > > > > each independently. An alternative is
> to
> > > do
> > > > > that
> > > > > > > > just
> > > > > > > > > in
> > > > > > > > > > > > > > >> > > > > LeaderAndIsrRequest.
> > > > > > > > > > > > > > >> > > > > That way, the new replicas will be
> > created
> > > > in
> > > > > > the
> > > > > > > > > right
> > > > > > > > > > > log
> > > > > > > > > > > > > dir
> > > > > > > > > > > > > > in
> > > > > > > > > > > > > > >> > the
> > > > > > > > > > > > > > >> > > > > first place and the controller just
> > needs
> > > to
> > > > > > track
> > > > > > > > the
> > > > > > > > > > > > > progress
> > > > > > > > > > > > > > of
> > > > > > > > > > > > > > >> > > > > partition reassignment in the current
> > way.
> > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > > I agree it is better to use one request
> > > > instead
> > > > > of
> > > > > > > two
> > > > > > > > > to
> > > > > > > > > > > > > request
> > > > > > > > > > > > > > >> > replica
> > > > > > > > > > > > > > >> > > > movement between disks. But I think the
> > > > > > performance
> > > > > > > > > > > advantage
> > > > > > > > > > > > of
> > > > > > > > > > > > > > >> doing
> > > > > > > > > > > > > > >> > so
> > > > > > > > > > > > > > >> > > > is negligible because we trigger replica
> > > > > > assignment
> > > > > > > > much
> > > > > > > > > > > less
> > > > > > > > > > > > > than
> > > > > > > > > > > > > > >> all
> > > > > > > > > > > > > > >> > > > other kinds of events in the Kafka
> > cluster.
> > > I
> > > > am
> > > > > > not
> > > > > > > > > sure
> > > > > > > > > > > that
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > >> > > benefit
> > > > > > > > > > > > > > >> > > > of doing this is worth the effort to add
> > an
> > > > > > optional
> > > > > > > > > > string
> > > > > > > > > > > > > field
> > > > > > > > > > > > > > in
> > > > > > > > > > > > > > >> > the
> > > > > > > > > > > > > > >> > > > LeaderAndIsrRequest. Also if we add this
> > > > > optional
> > > > > > > > field
> > > > > > > > > in
> > > > > > > > > > > the
> > > > > > > > > > > > > > >> > > > LeaderAndIsrRequest, we probably want to
> > > > remove
> > > > > > > > > > > > > > >> ChangeReplicaDirRequest
> > > > > > > > > > > > > > >> > > to
> > > > > > > > > > > > > > >> > > > avoid having two requests doing the same
> > > > thing.
> > > > > > But
> > > > > > > it
> > > > > > > > > > means
> > > > > > > > > > > > > user
> > > > > > > > > > > > > > >> > script
> > > > > > > > > > > > > > >> > > > can not send request directly to the
> > broker
> > > to
> > > > > > > trigger
> > > > > > > > > > > replica
> > > > > > > > > > > > > > >> movement
> > > > > > > > > > > > > > >> > > > between log directories.
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > > I will do it if you are strong about
> this
> > > > > > > optimzation.
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > >> > > > > 3. /admin/reassign_partitions:
> Including
> > > the
> > > > > log
> > > > > > > dir
> > > > > > > > > in
> > > > > > > > > > > > every
> > > > > > > > > > > > > > >> replica
> > > > > > > > > > > > > > >> > > may
> > > > > > > > > > > > > > >> > > > > not be efficient. We could include a
> > list
> > > of
> > > > > log
> > > > > > > > > > > directories
> > > > > > > > > > > > > and
> > > > > > > > > > > > > > >> > > > reference
> > > > > > > > > > > > > > >> > > > > the index of the log directory in each
> > > > > replica.
> > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > > Good point. I have updated the KIP to
> use
> > > this
> > > > > > > > solution.
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > >> > > > > 4. DescribeDirsRequest: The stats in
> the
> > > > > request
> > > > > > > are
> > > > > > > > > > > already
> > > > > > > > > > > > > > >> > available
> > > > > > > > > > > > > > >> > > > from
> > > > > > > > > > > > > > >> > > > > JMX. Do we need the new request?
> > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > > Does JMX also include the state (i.e.
> > > offline
> > > > or
> > > > > > > > online)
> > > > > > > > > > of
> > > > > > > > > > > > each
> > > > > > > > > > > > > > log
> > > > > > > > > > > > > > >> > > > directory and the log directory of each
> > > > replica?
> > > > > > If
> > > > > > > > not,
> > > > > > > > > > > then
> > > > > > > > > > > > > > maybe
> > > > > > > > > > > > > > >> we
> > > > > > > > > > > > > > >> > > > still need DescribeDirsRequest?
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > >> > > > > 5. We want to be consistent on
> > > > > > > > ChangeReplicaDirRequest
> > > > > > > > > > vs
> > > > > > > > > > > > > > >> > > > > ChangeReplicaRequest.
> > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > > I think ChangeReplicaRequest and
> > > > > > > ChangeReplicaResponse
> > > > > > > > > is
> > > > > > > > > > my
> > > > > > > > > > > > > typo.
> > > > > > > > > > > > > > >> > Sorry,
> > > > > > > > > > > > > > >> > > > they are fixed now.
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > >> > > > > Thanks,
> > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > >> > > > > Jun
> > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > >> > > > > On Fri, Feb 3, 2017 at 6:19 PM, Dong
> > Lin <
> > > > > > > > > > > > lindon...@gmail.com
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > >> > wrote:
> > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > >> > > > > > Hey ALexey,
> > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > >> > > > > > Thanks for all the comments!
> > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > >> > > > > > I have updated the KIP to specify
> how
> > we
> > > > > > enforce
> > > > > > > > > > quota.
> > > > > > > > > > > I
> > > > > > > > > > > > > also
> > > > > > > > > > > > > > >> > > updated
> > > > > > > > > > > > > > >> > > > > the
> > > > > > > > > > > > > > >> > > > > > "The thread model and broker logic
> for
> > > > > moving
> > > > > > > > > replica
> > > > > > > > > > > data
> > > > > > > > > > > > > > >> between
> > > > > > > > > > > > > > >> > > log
> > > > > > > > > > > > > > >> > > > > > directories" to make it easier to
> > read.
> > > > You
> > > > > > can
> > > > > > > > find
> > > > > > > > > > the
> > > > > > > > > > > > > exact
> > > > > > > > > > > > > > >> > change
> > > > > > > > > > > > > > >> > > > > here
> > > > > > > > > > > > > > >> > > > > > <https://cwiki.apache.org/conf
> > > > > > > > > > > > > luence/pages/diffpagesbyversio
> > > > > > > > > > > > > > >> > > > > > n.action?pageId=67638408&selec
> > > > > > > > > > > > > tedPageVersions=5&selectedPage
> > > > > > > > > > > > > > >> > > > Versions=6>.
> > > > > > > > > > > > > > >> > > > > > The idea is to use the same
> > replication
> > > > > quota
> > > > > > > > > > mechanism
> > > > > > > > > > > > > > >> introduced
> > > > > > > > > > > > > > >> > in
> > > > > > > > > > > > > > >> > > > > > KIP-73.
> > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > >> > > > > > Thanks,
> > > > > > > > > > > > > > >> > > > > > Dong
> > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > >> > > > > > On Wed, Feb 1, 2017 at 2:16 AM,
> Alexey
> > > > > > > Ozeritsky <
> > > > > > > > > > > > > > >> > > aozerit...@yandex.ru
> > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > >> > > > > > wrote:
> > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > >> > > > > > > 24.01.2017, 22:03, "Dong Lin" <
> > > > > > > > > lindon...@gmail.com
> > > > > > > > > > >:
> > > > > > > > > > > > > > >> > > > > > > > Hey Alexey,
> > > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > Thanks. I think we agreed that
> the
> > > > > > suggested
> > > > > > > > > > > solution
> > > > > > > > > > > > > > >> doesn't
> > > > > > > > > > > > > > >> > > work
> > > > > > > > > > > > > > >> > > > in
> > > > > > > > > > > > > > >> > > > > > > > general for kafka users. To
> answer
> > > > your
> > > > > > > > > questions:
> > > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > 1. I agree we need quota to rate
> > > limit
> > > > > > > replica
> > > > > > > > > > > > movement
> > > > > > > > > > > > > > >> when a
> > > > > > > > > > > > > > >> > > > broker
> > > > > > > > > > > > > > >> > > > > > is
> > > > > > > > > > > > > > >> > > > > > > > moving a "leader" replica. I
> will
> > > come
> > > > > up
> > > > > > > with
> > > > > > > > > > > > solution,
> > > > > > > > > > > > > > >> > probably
> > > > > > > > > > > > > > >> > > > > > re-use
> > > > > > > > > > > > > > >> > > > > > > > the config of replication quota
> > > > > introduced
> > > > > > > in
> > > > > > > > > > > KIP-73.
> > > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > 2. Good point. I agree that this
> > is
> > > a
> > > > > > > problem
> > > > > > > > in
> > > > > > > > > > > > > general.
> > > > > > > > > > > > > > >> If is
> > > > > > > > > > > > > > >> > > no
> > > > > > > > > > > > > > >> > > > > new
> > > > > > > > > > > > > > >> > > > > > > data
> > > > > > > > > > > > > > >> > > > > > > > on that broker, with current
> > default
> > > > > value
> > > > > > > of
> > > > > > > > > > > > > > >> > > > > > replica.fetch.wait.max.ms
> > > > > > > > > > > > > > >> > > > > > > > and replica.fetch.max.bytes, the
> > > > replica
> > > > > > > will
> > > > > > > > be
> > > > > > > > > > > moved
> > > > > > > > > > > > > at
> > > > > > > > > > > > > > >> only
> > > > > > > > > > > > > > >> > 2
> > > > > > > > > > > > > > >> > > > MBps
> > > > > > > > > > > > > > >> > > > > > > > throughput. I think the solution
> > is
> > > > for
> > > > > > > broker
> > > > > > > > > to
> > > > > > > > > > > set
> > > > > > > > > > > > > > >> > > > > > > > replica.fetch.wait.max.ms to 0
> in
> > > its
> > > > > > > > > > FetchRequest
> > > > > > > > > > > if
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > >> > > > > > corresponding
> > > > > > > > > > > > > > >> > > > > > > > ReplicaFetcherThread needs to
> move
> > > > some
> > > > > > > > replica
> > > > > > > > > to
> > > > > > > > > > > > > another
> > > > > > > > > > > > > > >> > disk.
> > > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > 3. I have updated the KIP to
> > mention
> > > > > that
> > > > > > > the
> > > > > > > > > read
> > > > > > > > > > > > size
> > > > > > > > > > > > > > of a
> > > > > > > > > > > > > > >> > > given
> > > > > > > > > > > > > > >> > > > > > > > partition is configured using
> > > > > > > > > > > replica.fetch.max.bytes
> > > > > > > > > > > > > when
> > > > > > > > > > > > > > >> we
> > > > > > > > > > > > > > >> > > move
> > > > > > > > > > > > > > >> > > > > > > replicas
> > > > > > > > > > > > > > >> > > > > > > > between disks.
> > > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > Please see this
> > > > > > > > > > > > > > >> > > > > > > > <https://cwiki.apache.org/conf
> > > > > > > > > > > > > > >> luence/pages/diffpagesbyversio
> > > > > > > > > > > > > > >> > > > n.action
> > > > > > > > > > > > > > >> > > > > ?
> > > > > > > > > > > > > > >> > > > > > > pageId=67638408&selectedPageVe
> > > > > > > > > > > > > > rsions=4&selectedPageVersions=
> > > > > > > > > > > > > > >> 5>
> > > > > > > > > > > > > > >> > > > > > > > for the change of the KIP. I
> will
> > > come
> > > > > up
> > > > > > > > with a
> > > > > > > > > > > > > solution
> > > > > > > > > > > > > > to
> > > > > > > > > > > > > > >> > > > throttle
> > > > > > > > > > > > > > >> > > > > > > > replica movement when a broker
> is
> > > > > moving a
> > > > > > > > > > "leader"
> > > > > > > > > > > > > > replica.
> > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > >> > > > > > > Thanks. It looks great.
> > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > On Tue, Jan 24, 2017 at 3:30 AM,
> > > > Alexey
> > > > > > > > > Ozeritsky
> > > > > > > > > > <
> > > > > > > > > > > > > > >> > > > > > aozerit...@yandex.ru>
> > > > > > > > > > > > > > >> > > > > > > > wrote:
> > > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > >>  23.01.2017, 22:11, "Dong Lin"
> <
> > > > > > > > > > > lindon...@gmail.com
> > > > > > > > > > > > >:
> > > > > > > > > > > > > > >> > > > > > > >>  > Thanks. Please see my
> comment
> > > > > inline.
> > > > > > > > > > > > > > >> > > > > > > >>  >
> > > > > > > > > > > > > > >> > > > > > > >>  > On Mon, Jan 23, 2017 at 6:45
> > AM,
> > > > > > Alexey
> > > > > > > > > > > Ozeritsky
> > > > > > > > > > > > <
> > > > > > > > > > > > > > >> > > > > > > aozerit...@yandex.ru>
> > > > > > > > > > > > > > >> > > > > > > >>  > wrote:
> > > > > > > > > > > > > > >> > > > > > > >>  >
> > > > > > > > > > > > > > >> > > > > > > >>  >> 13.01.2017, 22:29, "Dong
> > Lin" <
> > > > > > > > > > > > lindon...@gmail.com
> > > > > > > > > > > > > >:
> > > > > > > > > > > > > > >> > > > > > > >>  >> > Hey Alexey,
> > > > > > > > > > > > > > >> > > > > > > >>  >> >
> > > > > > > > > > > > > > >> > > > > > > >>  >> > Thanks for your review
> and
> > > the
> > > > > > > > > alternative
> > > > > > > > > > > > > > approach.
> > > > > > > > > > > > > > >> > Here
> > > > > > > > > > > > > > >> > > is
> > > > > > > > > > > > > > >> > > > > my
> > > > > > > > > > > > > > >> > > > > > > >>  >> > understanding of your
> > patch.
> > > > > > kafka's
> > > > > > > > > > > background
> > > > > > > > > > > > > > >> threads
> > > > > > > > > > > > > > >> > > are
> > > > > > > > > > > > > > >> > > > > used
> > > > > > > > > > > > > > >> > > > > > > to
> > > > > > > > > > > > > > >> > > > > > > >>  move
> > > > > > > > > > > > > > >> > > > > > > >>  >> > data between replicas.
> When
> > > > data
> > > > > > > > movement
> > > > > > > > > > is
> > > > > > > > > > > > > > >> triggered,
> > > > > > > > > > > > > > >> > > the
> > > > > > > > > > > > > > >> > > > > log
> > > > > > > > > > > > > > >> > > > > > > will
> > > > > > > > > > > > > > >> > > > > > > >>  be
> > > > > > > > > > > > > > >> > > > > > > >>  >> > rolled and the new logs
> > will
> > > be
> > > > > put
> > > > > > > in
> > > > > > > > > the
> > > > > > > > > > > new
> > > > > > > > > > > > > > >> > directory,
> > > > > > > > > > > > > > >> > > > and
> > > > > > > > > > > > > > >> > > > > > > >>  background
> > > > > > > > > > > > > > >> > > > > > > >>  >> > threads will move segment
> > > from
> > > > > old
> > > > > > > > > > directory
> > > > > > > > > > > to
> > > > > > > > > > > > > new
> > > > > > > > > > > > > > >> > > > directory.
> > > > > > > > > > > > > > >> > > > > > > >>  >> >
> > > > > > > > > > > > > > >> > > > > > > >>  >> > It is important to note
> > that
> > > > > > KIP-112
> > > > > > > is
> > > > > > > > > > > > intended
> > > > > > > > > > > > > to
> > > > > > > > > > > > > > >> work
> > > > > > > > > > > > > > >> > > > with
> > > > > > > > > > > > > > >> > > > > > > >>  KIP-113 to
> > > > > > > > > > > > > > >> > > > > > > >>  >> > support JBOD. I think
> your
> > > > > solution
> > > > > > > is
> > > > > > > > > > > > definitely
> > > > > > > > > > > > > > >> > simpler
> > > > > > > > > > > > > > >> > > > and
> > > > > > > > > > > > > > >> > > > > > > better
> > > > > > > > > > > > > > >> > > > > > > >>  >> under
> > > > > > > > > > > > > > >> > > > > > > >>  >> > the current kafka
> > > > implementation
> > > > > > > that a
> > > > > > > > > > > broker
> > > > > > > > > > > > > will
> > > > > > > > > > > > > > >> fail
> > > > > > > > > > > > > > >> > > if
> > > > > > > > > > > > > > >> > > > > any
> > > > > > > > > > > > > > >> > > > > > > disk
> > > > > > > > > > > > > > >> > > > > > > >>  >> fails.
> > > > > > > > > > > > > > >> > > > > > > >>  >> > But I am not sure if we
> > want
> > > to
> > > > > > allow
> > > > > > > > > > broker
> > > > > > > > > > > to
> > > > > > > > > > > > > run
> > > > > > > > > > > > > > >> with
> > > > > > > > > > > > > > >> > > > > partial
> > > > > > > > > > > > > > >> > > > > > > >>  disks
> > > > > > > > > > > > > > >> > > > > > > >>  >> > failure. Let's say the a
> > > > replica
> > > > > is
> > > > > > > > being
> > > > > > > > > > > moved
> > > > > > > > > > > > > > from
> > > > > > > > > > > > > > >> > > > > log_dir_old
> > > > > > > > > > > > > > >> > > > > > > to
> > > > > > > > > > > > > > >> > > > > > > >>  >> > log_dir_new and then
> > > > log_dir_old
> > > > > > > stops
> > > > > > > > > > > working
> > > > > > > > > > > > > due
> > > > > > > > > > > > > > to
> > > > > > > > > > > > > > >> > disk
> > > > > > > > > > > > > > >> > > > > > > failure.
> > > > > > > > > > > > > > >> > > > > > > >>  How
> > > > > > > > > > > > > > >> > > > > > > >>  >> > would your existing patch
> > > > handles
> > > > > > it?
> > > > > > > > To
> > > > > > > > > > make
> > > > > > > > > > > > the
> > > > > > > > > > > > > > >> > > scenario a
> > > > > > > > > > > > > > >> > > > > bit
> > > > > > > > > > > > > > >> > > > > > > more
> > > > > > > > > > > > > > >> > > > > > > >>  >>
> > > > > > > > > > > > > > >> > > > > > > >>  >> We will lose log_dir_old.
> > After
> > > > > > broker
> > > > > > > > > > restart
> > > > > > > > > > > we
> > > > > > > > > > > > > can
> > > > > > > > > > > > > > >> read
> > > > > > > > > > > > > > >> > > the
> > > > > > > > > > > > > > >> > > > > > data
> > > > > > > > > > > > > > >> > > > > > > >>  from
> > > > > > > > > > > > > > >> > > > > > > >>  >> log_dir_new.
> > > > > > > > > > > > > > >> > > > > > > >>  >
> > > > > > > > > > > > > > >> > > > > > > >>  > No, you probably can't. This
> > is
> > > > > > because
> > > > > > > > the
> > > > > > > > > > > broker
> > > > > > > > > > > > > > >> doesn't
> > > > > > > > > > > > > > >> > > have
> > > > > > > > > > > > > > >> > > > > > > *all* the
> > > > > > > > > > > > > > >> > > > > > > >>  > data for this partition. For
> > > > > example,
> > > > > > > say
> > > > > > > > > the
> > > > > > > > > > > > broker
> > > > > > > > > > > > > > has
> > > > > > > > > > > > > > >> > > > > > > >>  > partition_segement_1,
> > > > > > > partition_segment_50
> > > > > > > > > and
> > > > > > > > > > > > > > >> > > > > > partition_segment_100
> > > > > > > > > > > > > > >> > > > > > > on
> > > > > > > > > > > > > > >> > > > > > > >>  the
> > > > > > > > > > > > > > >> > > > > > > >>  > log_dir_old.
> > > > partition_segment_100,
> > > > > > > which
> > > > > > > > > has
> > > > > > > > > > > the
> > > > > > > > > > > > > > latest
> > > > > > > > > > > > > > >> > > data,
> > > > > > > > > > > > > > >> > > > > has
> > > > > > > > > > > > > > >> > > > > > > been
> > > > > > > > > > > > > > >> > > > > > > >>  > moved to log_dir_new, and
> the
> > > > > > > log_dir_old
> > > > > > > > > > fails
> > > > > > > > > > > > > before
> > > > > > > > > > > > > > >> > > > > > > >>  partition_segment_50
> > > > > > > > > > > > > > >> > > > > > > >>  > and partition_segment_1 is
> > moved
> > > > to
> > > > > > > > > > log_dir_new.
> > > > > > > > > > > > > When
> > > > > > > > > > > > > > >> > broker
> > > > > > > > > > > > > > >> > > > > > > re-starts,
> > > > > > > > > > > > > > >> > > > > > > >>  it
> > > > > > > > > > > > > > >> > > > > > > >>  > won't have
> > partition_segment_50.
> > > > > This
> > > > > > > > causes
> > > > > > > > > > > > problem
> > > > > > > > > > > > > > if
> > > > > > > > > > > > > > >> > > broker
> > > > > > > > > > > > > > >> > > > is
> > > > > > > > > > > > > > >> > > > > > > elected
> > > > > > > > > > > > > > >> > > > > > > >>  > leader and consumer wants to
> > > > consume
> > > > > > > data
> > > > > > > > in
> > > > > > > > > > the
> > > > > > > > > > > > > > >> > > > > > partition_segment_1.
> > > > > > > > > > > > > > >> > > > > > > >>
> > > > > > > > > > > > > > >> > > > > > > >>  Right.
> > > > > > > > > > > > > > >> > > > > > > >>
> > > > > > > > > > > > > > >> > > > > > > >>  >
> > > > > > > > > > > > > > >> > > > > > > >>  >> > complicated, let's say
> the
> > > > broker
> > > > > > is
> > > > > > > > > > > shtudown,
> > > > > > > > > > > > > > >> > > log_dir_old's
> > > > > > > > > > > > > > >> > > > > > disk
> > > > > > > > > > > > > > >> > > > > > > >>  fails,
> > > > > > > > > > > > > > >> > > > > > > >>  >> > and the broker starts. In
> > > this
> > > > > case
> > > > > > > > > broker
> > > > > > > > > > > > > doesn't
> > > > > > > > > > > > > > >> even
> > > > > > > > > > > > > > >> > > know
> > > > > > > > > > > > > > >> > > > > if
> > > > > > > > > > > > > > >> > > > > > > >>  >> log_dir_new
> > > > > > > > > > > > > > >> > > > > > > >>  >> > has all the data needed
> for
> > > > this
> > > > > > > > replica.
> > > > > > > > > > It
> > > > > > > > > > > > > > becomes
> > > > > > > > > > > > > > >> a
> > > > > > > > > > > > > > >> > > > problem
> > > > > > > > > > > > > > >> > > > > > if
> > > > > > > > > > > > > > >> > > > > > > the
> > > > > > > > > > > > > > >> > > > > > > >>  >> > broker is elected leader
> of
> > > > this
> > > > > > > > > partition
> > > > > > > > > > in
> > > > > > > > > > > > > this
> > > > > > > > > > > > > > >> case.
> > > > > > > > > > > > > > >> > > > > > > >>  >>
> > > > > > > > > > > > > > >> > > > > > > >>  >> log_dir_new contains the
> most
> > > > > recent
> > > > > > > data
> > > > > > > > > so
> > > > > > > > > > we
> > > > > > > > > > > > > will
> > > > > > > > > > > > > > >> lose
> > > > > > > > > > > > > > >> > > the
> > > > > > > > > > > > > > >> > > > > tail
> > > > > > > > > > > > > > >> > > > > > > of
> > > > > > > > > > > > > > >> > > > > > > >>  >> partition.
> > > > > > > > > > > > > > >> > > > > > > >>  >> This is not a big problem
> for
> > > us
> > > > > > > because
> > > > > > > > we
> > > > > > > > > > > > already
> > > > > > > > > > > > > > >> delete
> > > > > > > > > > > > > > >> > > > tails
> > > > > > > > > > > > > > >> > > > > > by
> > > > > > > > > > > > > > >> > > > > > > >>  hand
> > > > > > > > > > > > > > >> > > > > > > >>  >> (see
> > > > > https://issues.apache.org/jira
> > > > > > > > > > > > > > /browse/KAFKA-1712
> > > > > > > > > > > > > > >> ).
> > > > > > > > > > > > > > >> > > > > > > >>  >> Also we dont use authomatic
> > > > leader
> > > > > > > > > balancing
> > > > > > > > > > > > > > >> > > > > > > >>  (auto.leader.rebalance.enable=
> > > > false),
> > > > > > > > > > > > > > >> > > > > > > >>  >> so this partition becomes
> the
> > > > > leader
> > > > > > > > with a
> > > > > > > > > > low
> > > > > > > > > > > > > > >> > probability.
> > > > > > > > > > > > > > >> > > > > > > >>  >> I think my patch can be
> > > modified
> > > > to
> > > > > > > > > prohibit
> > > > > > > > > > > the
> > > > > > > > > > > > > > >> selection
> > > > > > > > > > > > > > >> > > of
> > > > > > > > > > > > > > >> > > > > the
> > > > > > > > > > > > > > >> > > > > > > >>  leader
> > > > > > > > > > > > > > >> > > > > > > >>  >> until the partition does
> not
> > > move
> > > > > > > > > completely.
> > > > > > > > > > > > > > >> > > > > > > >>  >
> > > > > > > > > > > > > > >> > > > > > > >>  > I guess you are saying that
> > you
> > > > have
> > > > > > > > deleted
> > > > > > > > > > the
> > > > > > > > > > > > > tails
> > > > > > > > > > > > > > >> by
> > > > > > > > > > > > > > >> > > hand
> > > > > > > > > > > > > > >> > > > in
> > > > > > > > > > > > > > >> > > > > > > your
> > > > > > > > > > > > > > >> > > > > > > >>  own
> > > > > > > > > > > > > > >> > > > > > > >>  > kafka branch. But KAFKA-1712
> > is
> > > > not
> > > > > > > > accepted
> > > > > > > > > > > into
> > > > > > > > > > > > > > Kafka
> > > > > > > > > > > > > > >> > trunk
> > > > > > > > > > > > > > >> > > > > and I
> > > > > > > > > > > > > > >> > > > > > > am
> > > > > > > > > > > > > > >> > > > > > > >>  not
> > > > > > > > > > > > > > >> > > > > > > >>
> > > > > > > > > > > > > > >> > > > > > > >>  No. We just modify segments
> > mtime
> > > by
> > > > > > cron
> > > > > > > > job.
> > > > > > > > > > > This
> > > > > > > > > > > > > > works
> > > > > > > > > > > > > > >> > with
> > > > > > > > > > > > > > >> > > > > > vanilla
> > > > > > > > > > > > > > >> > > > > > > >>  kafka.
> > > > > > > > > > > > > > >> > > > > > > >>
> > > > > > > > > > > > > > >> > > > > > > >>  > sure if it is the right
> > > solution.
> > > > > How
> > > > > > > > would
> > > > > > > > > > this
> > > > > > > > > > > > > > >> solution
> > > > > > > > > > > > > > >> > > > address
> > > > > > > > > > > > > > >> > > > > > the
> > > > > > > > > > > > > > >> > > > > > > >>  > problem mentioned above?
> > > > > > > > > > > > > > >> > > > > > > >>
> > > > > > > > > > > > > > >> > > > > > > >>  If you need only fresh data
> and
> > if
> > > > you
> > > > > > > > remove
> > > > > > > > > > old
> > > > > > > > > > > > data
> > > > > > > > > > > > > > by
> > > > > > > > > > > > > > >> > hands
> > > > > > > > > > > > > > >> > > > > this
> > > > > > > > > > > > > > >> > > > > > is
> > > > > > > > > > > > > > >> > > > > > > >>  not a problem. But in general
> > case
> > > > > > > > > > > > > > >> > > > > > > >>  this is a problem of course.
> > > > > > > > > > > > > > >> > > > > > > >>
> > > > > > > > > > > > > > >> > > > > > > >>  >
> > > > > > > > > > > > > > >> > > > > > > >>  > BTW, I am not sure the
> > solution
> > > > > > > mentioned
> > > > > > > > in
> > > > > > > > > > > > > > KAFKA-1712
> > > > > > > > > > > > > > >> is
> > > > > > > > > > > > > > >> > > the
> > > > > > > > > > > > > > >> > > > > > right
> > > > > > > > > > > > > > >> > > > > > > way
> > > > > > > > > > > > > > >> > > > > > > >>  to
> > > > > > > > > > > > > > >> > > > > > > >>  > address its problem. Now
> that
> > we
> > > > > have
> > > > > > > > > > timestamp
> > > > > > > > > > > in
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > >> > > message
> > > > > > > > > > > > > > >> > > > we
> > > > > > > > > > > > > > >> > > > > > > can use
> > > > > > > > > > > > > > >> > > > > > > >>  > that to delete old segement
> > > > instead
> > > > > of
> > > > > > > > > relying
> > > > > > > > > > > on
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > >> log
> > > > > > > > > > > > > > >> > > > segment
> > > > > > > > > > > > > > >> > > > > > > mtime.
> > > > > > > > > > > > > > >> > > > > > > >>  > Just some idea and we don't
> > have
> > > > to
> > > > > > > > discuss
> > > > > > > > > > this
> > > > > > > > > > > > > > problem
> > > > > > > > > > > > > > >> > > here.
> > > > > > > > > > > > > > >> > > > > > > >>  >
> > > > > > > > > > > > > > >> > > > > > > >>  >> >
> > > > > > > > > > > > > > >> > > > > > > >>  >> > The solution presented in
> > the
> > > > KIP
> > > > > > > > > attempts
> > > > > > > > > > to
> > > > > > > > > > > > > > handle
> > > > > > > > > > > > > > >> it
> > > > > > > > > > > > > > >> > by
> > > > > > > > > > > > > > >> > > > > > > replacing
> > > > > > > > > > > > > > >> > > > > > > >>  >> > replica in an atomic
> > version
> > > > > > fashion
> > > > > > > > > after
> > > > > > > > > > > the
> > > > > > > > > > > > > log
> > > > > > > > > > > > > > in
> > > > > > > > > > > > > > >> > the
> > > > > > > > > > > > > > >> > > > new
> > > > > > > > > > > > > > >> > > > > > dir
> > > > > > > > > > > > > > >> > > > > > > has
> > > > > > > > > > > > > > >> > > > > > > >>  >> fully
> > > > > > > > > > > > > > >> > > > > > > >>  >> > caught up with the log in
> > the
> > > > old
> > > > > > > dir.
> > > > > > > > At
> > > > > > > > > > at
> > > > > > > > > > > > time
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > >> > log
> > > > > > > > > > > > > > >> > > > can
> > > > > > > > > > > > > > >> > > > > be
> > > > > > > > > > > > > > >> > > > > > > >>  >> considered
> > > > > > > > > > > > > > >> > > > > > > >>  >> > to exist on only one log
> > > > > directory.
> > > > > > > > > > > > > > >> > > > > > > >>  >>
> > > > > > > > > > > > > > >> > > > > > > >>  >> As I understand your
> solution
> > > > does
> > > > > > not
> > > > > > > > > cover
> > > > > > > > > > > > > quotas.
> > > > > > > > > > > > > > >> > > > > > > >>  >> What happens if someone
> > starts
> > > to
> > > > > > > > transfer
> > > > > > > > > > 100
> > > > > > > > > > > > > > >> partitions
> > > > > > > > > > > > > > >> > ?
> > > > > > > > > > > > > > >> > > > > > > >>  >
> > > > > > > > > > > > > > >> > > > > > > >>  > Good point. Quota can be
> > > > implemented
> > > > > > in
> > > > > > > > the
> > > > > > > > > > > > future.
> > > > > > > > > > > > > It
> > > > > > > > > > > > > > >> is
> > > > > > > > > > > > > > >> > > > > currently
> > > > > > > > > > > > > > >> > > > > > > >>  > mentioned as as a potential
> > > future
> > > > > > > > > improvement
> > > > > > > > > > > in
> > > > > > > > > > > > > > >> KIP-112
> > > > > > > > > > > > > > >> > > > > > > >>  > <
> > https://cwiki.apache.org/conf
> > > > > > > > > > > > > > luence/display/KAFKA/KIP-
> > > > > > > > > > > > > > >> > 112%3
> > > > > > > > > > > > > > >> > > > > > > >>  A+Handle+disk+failure+for+
> > > > > JBOD>.Thanks
> > > > > > > > > > > > > > >> > > > > > > >>  > for the reminder. I will
> move
> > it
> > > > to
> > > > > > > > KIP-113.
> > > > > > > > > > > > > > >> > > > > > > >>  >
> > > > > > > > > > > > > > >> > > > > > > >>  >> > If yes, it will read a
> > > > > > > > > ByteBufferMessageSet
> > > > > > > > > > > > from
> > > > > > > > > > > > > > >> > > > > > > topicPartition.log
> > > > > > > > > > > > > > >> > > > > > > >>  and
> > > > > > > > > > > > > > >> > > > > > > >>  >> append the message set to
> > > > > > > > > topicPartition.move
> > > > > > > > > > > > > > >> > > > > > > >>  >>
> > > > > > > > > > > > > > >> > > > > > > >>  >> i.e. processPartitionData
> > will
> > > > read
> > > > > > > data
> > > > > > > > > from
> > > > > > > > > > > the
> > > > > > > > > > > > > > >> > beginning
> > > > > > > > > > > > > > >> > > of
> > > > > > > > > > > > > > >> > > > > > > >>  >> topicPartition.log? What is
> > the
> > > > > read
> > > > > > > > size?
> > > > > > > > > > > > > > >> > > > > > > >>  >> A ReplicaFetchThread reads
> > many
> > > > > > > > partitions
> > > > > > > > > so
> > > > > > > > > > > if
> > > > > > > > > > > > > one
> > > > > > > > > > > > > > >> does
> > > > > > > > > > > > > > >> > > some
> > > > > > > > > > > > > > >> > > > > > > >>  complicated
> > > > > > > > > > > > > > >> > > > > > > >>  >> work (= read a lot of data
> > from
> > > > > disk)
> > > > > > > > > > > everything
> > > > > > > > > > > > > will
> > > > > > > > > > > > > > >> slow
> > > > > > > > > > > > > > >> > > > down.
> > > > > > > > > > > > > > >> > > > > > > >>  >> I think read size should
> not
> > be
> > > > > very
> > > > > > > big.
> > > > > > > > > > > > > > >> > > > > > > >>  >>
> > > > > > > > > > > > > > >> > > > > > > >>  >> On the other hand at this
> > point
> > > > > > > > > > > > > > (processPartitionData)
> > > > > > > > > > > > > > >> one
> > > > > > > > > > > > > > >> > > can
> > > > > > > > > > > > > > >> > > > > use
> > > > > > > > > > > > > > >> > > > > > > only
> > > > > > > > > > > > > > >> > > > > > > >>  >> the new data
> > > > (ByteBufferMessageSet
> > > > > > from
> > > > > > > > > > > > parameters)
> > > > > > > > > > > > > > and
> > > > > > > > > > > > > > >> > wait
> > > > > > > > > > > > > > >> > > > > until
> > > > > > > > > > > > > > >> > > > > > > >>  >> (topicPartition.move.
> > > > > smallestOffset
> > > > > > <=
> > > > > > > > > > > > > > >> > > > > > > topicPartition.log.smallestOff
> > > > > > > > > > > > > > >> > > > > > > >>  set
> > > > > > > > > > > > > > >> > > > > > > >>  >> && topicPartition.log.
> > > > > largestOffset
> > > > > > ==
> > > > > > > > > > > > > > >> > > > > > > topicPartition.log.largestOffs
> > > > > > > > > > > > > > >> > > > > > > >>  et).
> > > > > > > > > > > > > > >> > > > > > > >>  >> In this case the write
> speed
> > to
> > > > > > > > > > > > topicPartition.move
> > > > > > > > > > > > > > and
> > > > > > > > > > > > > > >> > > > > > > >>  topicPartition.log
> > > > > > > > > > > > > > >> > > > > > > >>  >> will be the same so this
> will
> > > > allow
> > > > > > us
> > > > > > > to
> > > > > > > > > > move
> > > > > > > > > > > > many
> > > > > > > > > > > > > > >> > > partitions
> > > > > > > > > > > > > > >> > > > > to
> > > > > > > > > > > > > > >> > > > > > > one
> > > > > > > > > > > > > > >> > > > > > > >>  disk.
> > > > > > > > > > > > > > >> > > > > > > >>  >
> > > > > > > > > > > > > > >> > > > > > > >>  > The read size of a given
> > > partition
> > > > > is
> > > > > > > > > > configured
> > > > > > > > > > > > > > >> > > > > > > >>  > using
> replica.fetch.max.bytes,
> > > > which
> > > > > > is
> > > > > > > > the
> > > > > > > > > > same
> > > > > > > > > > > > > size
> > > > > > > > > > > > > > >> used
> > > > > > > > > > > > > > >> > by
> > > > > > > > > > > > > > >> > > > > > > >>  FetchRequest
> > > > > > > > > > > > > > >> > > > > > > >>  > from follower to leader. If
> > the
> > > > > broker
> > > > > > > is
> > > > > > > > > > > moving a
> > > > > > > > > > > > > > >> replica
> > > > > > > > > > > > > > >> > > for
> > > > > > > > > > > > > > >> > > > > > which
> > > > > > > > > > > > > > >> > > > > > > it
> > > > > > > > > > > > > > >> > > > > > > >>
> > > > > > > > > > > > > > >> > > > > > > >>  OK. Could you mention it in
> KIP?
> > > > > > > > > > > > > > >> > > > > > > >>
> > > > > > > > > > > > > > >> > > > > > > >>  > acts as a follower, the disk
> > > write
> > > > > > rate
> > > > > > > > for
> > > > > > > > > > > moving
> > > > > > > > > > > > > > this
> > > > > > > > > > > > > > >> > > replica
> > > > > > > > > > > > > > >> > > > > is
> > > > > > > > > > > > > > >> > > > > > at
> > > > > > > > > > > > > > >> > > > > > > >>  most
> > > > > > > > > > > > > > >> > > > > > > >>  > the rate it fetches from
> > leader
> > > > > > (assume
> > > > > > > it
> > > > > > > > > is
> > > > > > > > > > > > > catching
> > > > > > > > > > > > > > >> up
> > > > > > > > > > > > > > >> > and
> > > > > > > > > > > > > > >> > > > has
> > > > > > > > > > > > > > >> > > > > > > >>  > sufficient data to read from
> > > > leader,
> > > > > > > which
> > > > > > > > > is
> > > > > > > > > > > > > subject
> > > > > > > > > > > > > > to
> > > > > > > > > > > > > > >> > > > > > > round-trip-time
> > > > > > > > > > > > > > >> > > > > > > >>  > between itself and the
> leader.
> > > > Thus
> > > > > > this
> > > > > > > > > part
> > > > > > > > > > if
> > > > > > > > > > > > > > >> probably
> > > > > > > > > > > > > > >> > > fine
> > > > > > > > > > > > > > >> > > > > even
> > > > > > > > > > > > > > >> > > > > > > >>  without
> > > > > > > > > > > > > > >> > > > > > > >>  > quota.
> > > > > > > > > > > > > > >> > > > > > > >>
> > > > > > > > > > > > > > >> > > > > > > >>  I think there are 2 problems
> > > > > > > > > > > > > > >> > > > > > > >>  1. Without speed limiter this
> > will
> > > > not
> > > > > > > work
> > > > > > > > > good
> > > > > > > > > > > > even
> > > > > > > > > > > > > > for
> > > > > > > > > > > > > > >> 1
> > > > > > > > > > > > > > >> > > > > > partition.
> > > > > > > > > > > > > > >> > > > > > > In
> > > > > > > > > > > > > > >> > > > > > > >>  our production we had a
> problem
> > so
> > > > we
> > > > > > did
> > > > > > > > the
> > > > > > > > > > > > throuput
> > > > > > > > > > > > > > >> > limiter:
> > > > > > > > > > > > > > >> > > > > > > >>
> https://github.com/resetius/ka
> > > > > > > > > > > > > > >> fka/commit/cda31dadb2f135743bf
> > > > > > > > > > > > > > >> > > > > > > >>  41083062927886c5ddce1#diff-ffa
> > > > > > > > > > > > > > >> 8861e850121997a534ebdde2929c6R
> > > > > > > > > > > > > > >> > > 713
> > > > > > > > > > > > > > >> > > > > > > >>
> > > > > > > > > > > > > > >> > > > > > > >>  2. I dont understand how it
> will
> > > > work
> > > > > in
> > > > > > > > case
> > > > > > > > > of
> > > > > > > > > > > big
> > > > > > > > > > > > > > >> > > > > > > >>  replica.fetch.wait.max.ms and
> > > > > partition
> > > > > > > > with
> > > > > > > > > > > > > irregular
> > > > > > > > > > > > > > >> flow.
> > > > > > > > > > > > > > >> > > > > > > >>  For example someone could have
> > > > > > > > > > > > > > replica.fetch.wait.max.ms
> > > > > > > > > > > > > > >> > =10mi
> > > > > > > > > > > > > > >> > > > nutes
> > > > > > > > > > > > > > >> > > > > > and
> > > > > > > > > > > > > > >> > > > > > > >>  partition that has very high
> > data
> > > > flow
> > > > > > > from
> > > > > > > > > > 12:00
> > > > > > > > > > > to
> > > > > > > > > > > > > > 13:00
> > > > > > > > > > > > > > >> > and
> > > > > > > > > > > > > > >> > > > zero
> > > > > > > > > > > > > > >> > > > > > > flow
> > > > > > > > > > > > > > >> > > > > > > >>  otherwise.
> > > > > > > > > > > > > > >> > > > > > > >>  In this case
> > processPartitionData
> > > > > could
> > > > > > be
> > > > > > > > > > called
> > > > > > > > > > > > once
> > > > > > > > > > > > > > per
> > > > > > > > > > > > > > >> > > > > 10minutes
> > > > > > > > > > > > > > >> > > > > > > so if
> > > > > > > > > > > > > > >> > > > > > > >>  we start data moving in 13:01
> it
> > > > will
> > > > > be
> > > > > > > > > > finished
> > > > > > > > > > > > next
> > > > > > > > > > > > > > >> day.
> > > > > > > > > > > > > > >> > > > > > > >>
> > > > > > > > > > > > > > >> > > > > > > >>  >
> > > > > > > > > > > > > > >> > > > > > > >>  > But ff the broker is moving
> a
> > > > > replica
> > > > > > > for
> > > > > > > > > > which
> > > > > > > > > > > it
> > > > > > > > > > > > > > acts
> > > > > > > > > > > > > > >> as
> > > > > > > > > > > > > > >> > a
> > > > > > > > > > > > > > >> > > > > > leader,
> > > > > > > > > > > > > > >> > > > > > > as
> > > > > > > > > > > > > > >> > > > > > > >>  of
> > > > > > > > > > > > > > >> > > > > > > >>  > current KIP the broker will
> > keep
> > > > > > reading
> > > > > > > > > from
> > > > > > > > > > > > > > >> log_dir_old
> > > > > > > > > > > > > > >> > and
> > > > > > > > > > > > > > >> > > > > > append
> > > > > > > > > > > > > > >> > > > > > > to
> > > > > > > > > > > > > > >> > > > > > > >>  > log_dir_new without having
> to
> > > wait
> > > > > for
> > > > > > > > > > > > > > round-trip-time.
> > > > > > > > > > > > > > >> We
> > > > > > > > > > > > > > >> > > > > probably
> > > > > > > > > > > > > > >> > > > > > > need
> > > > > > > > > > > > > > >> > > > > > > >>  > quota for this in the
> future.
> > > > > > > > > > > > > > >> > > > > > > >>  >
> > > > > > > > > > > > > > >> > > > > > > >>  >> >
> > > > > > > > > > > > > > >> > > > > > > >>  >> > And to answer your
> > question,
> > > > yes
> > > > > > > > > > > > > topicpartition.log
> > > > > > > > > > > > > > >> > refers
> > > > > > > > > > > > > > >> > > > to
> > > > > > > > > > > > > > >> > > > > > > >>  >> >
> > topic-paritition/segment.log.
> > > > > > > > > > > > > > >> > > > > > > >>  >> >
> > > > > > > > > > > > > > >> > > > > > > >>  >> > Thanks,
> > > > > > > > > > > > > > >> > > > > > > >>  >> > Dong
> > > > > > > > > > > > > > >> > > > > > > >>  >> >
> > > > > > > > > > > > > > >> > > > > > > >>  >> > On Fri, Jan 13, 2017 at
> > 4:12
> > > > AM,
> > > > > > > Alexey
> > > > > > > > > > > > > Ozeritsky <
> > > > > > > > > > > > > > >> > > > > > > >>  aozerit...@yandex.ru>
> > > > > > > > > > > > > > >> > > > > > > >>  >> > wrote:
> > > > > > > > > > > > > > >> > > > > > > >>  >> >
> > > > > > > > > > > > > > >> > > > > > > >>  >> >> Hi,
> > > > > > > > > > > > > > >> > > > > > > >>  >> >>
> > > > > > > > > > > > > > >> > > > > > > >>  >> >> We have the similar
> > solution
> > > > > that
> > > > > > > have
> > > > > > > > > > been
> > > > > > > > > > > > > > working
> > > > > > > > > > > > > > >> in
> > > > > > > > > > > > > > >> > > > > > production
> > > > > > > > > > > > > > >> > > > > > > >>  since
> > > > > > > > > > > > > > >> > > > > > > >>  >> >> 2014. You can see it
> here:
> > > > > > > > > > > > > > >> > > https://github.com/resetius/ka
> > > > > > > > > > > > > > >> > > > > > > >>  >> >>
> > > fka/commit/20658593e246d218490
> > > > > > > > > > > > > > 6879defa2e763c4d413fb
> > > > > > > > > > > > > > >> > > > > > > >>  >> >> The idea is very simple
> > > > > > > > > > > > > > >> > > > > > > >>  >> >> 1. Disk balancer runs
> in a
> > > > > > separate
> > > > > > > > > thread
> > > > > > > > > > > > > inside
> > > > > > > > > > > > > > >> > > scheduler
> > > > > > > > > > > > > > >> > > > > > pool.
> > > > > > > > > > > > > > >> > > > > > > >>  >> >> 2. It does not touch
> empty
> > > > > > > partitions
> > > > > > > > > > > > > > >> > > > > > > >>  >> >> 3. Before it moves a
> > > partition
> > > > > it
> > > > > > > > > forcibly
> > > > > > > > > > > > > creates
> > > > > > > > > > > > > > >> new
> > > > > > > > > > > > > > >> > > > > segment
> > > > > > > > > > > > > > >> > > > > > > on a
> > > > > > > > > > > > > > >> > > > > > > >>  >> >> destination disk
> > > > > > > > > > > > > > >> > > > > > > >>  >> >> 4. It moves segment by
> > > segment
> > > > > > from
> > > > > > > > new
> > > > > > > > > to
> > > > > > > > > > > > old.
> > > > > > > > > > > > > > >> > > > > > > >>  >> >> 5. Log class works with
> > > > segments
> > > > > > on
> > > > > > > > both
> > > > > > > > > > > disks
> > > > > > > > > > > > > > >> > > > > > > >>  >> >>
> > > > > > > > > > > > > > >> > > > > > > >>  >> >> Your approach seems too
> > > > > > complicated,
> > > > > > > > > > > moreover
> > > > > > > > > > > > it
> > > > > > > > > > > > > > >> means
> > > > > > > > > > > > > > >> > > that
> > > > > > > > > > > > > > >> > > > > you
> > > > > > > > > > > > > > >> > > > > > > >>  have to
> > > > > > > > > > > > > > >> > > > > > > >>  >> >> patch different
> components
> > > of
> > > > > the
> > > > > > > > system
> > > > > > > > > > > > > > >> > > > > > > >>  >> >> Could you clarify what
> do
> > > you
> > > > > mean
> > > > > > > by
> > > > > > > > > > > > > > >> > topicPartition.log?
> > > > > > > > > > > > > > >> > > > Is
> > > > > > > > > > > > > > >> > > > > it
> > > > > > > > > > > > > > >> > > > > > > >>  >> >>
> > > topic-paritition/segment.log ?
> > > > > > > > > > > > > > >> > > > > > > >>  >> >>
> > > > > > > > > > > > > > >> > > > > > > >>  >> >> 12.01.2017, 21:47, "Dong
> > > Lin"
> > > > <
> > > > > > > > > > > > > > lindon...@gmail.com
> > > > > > > > > > > > > > >> >:
> > > > > > > > > > > > > > >> > > > > > > >>  >> >> > Hi all,
> > > > > > > > > > > > > > >> > > > > > > >>  >> >> >
> > > > > > > > > > > > > > >> > > > > > > >>  >> >> > We created KIP-113:
> > > Support
> > > > > > > replicas
> > > > > > > > > > > > movement
> > > > > > > > > > > > > > >> between
> > > > > > > > > > > > > > >> > > log
> > > > > > > > > > > > > > >> > > > > > > >>  >> directories.
> > > > > > > > > > > > > > >> > > > > > > >>  >> >> > Please find the KIP
> wiki
> > > in
> > > > > the
> > > > > > > link
> > > > > > > > > > > > > > >> > > > > > > >>  >> >> > *
> > > > > https://cwiki.apache.org/conf
> > > > > > > > > > > > > > >> > > > > luence/display/KAFKA/KIP-113%
> > > > > > > > > > > > > > >> > > > > > > >>  >> >>
> > > 3A+Support+replicas+movement+b
> > > > > > > > > > > > > > >> etween+log+directories
> > > > > > > > > > > > > > >> > > > > > > >>  >> >> > <
> > > > > https://cwiki.apache.org/conf
> > > > > > > > > > > > > > >> > > > > luence/display/KAFKA/KIP-113%
> > > > > > > > > > > > > > >> > > > > > > >>  >> >>
> > > 3A+Support+replicas+movement+
> > > > > > > > > > > > > > >> > between+log+directories>.*
> > > > > > > > > > > > > > >> > > > > > > >>  >> >> >
> > > > > > > > > > > > > > >> > > > > > > >>  >> >> > This KIP is related to
> > > > KIP-112
> > > > > > > > > > > > > > >> > > > > > > >>  >> >> > <
> > > > > https://cwiki.apache.org/conf
> > > > > > > > > > > > > > >> > > > > luence/display/KAFKA/KIP-112%
> > > > > > > > > > > > > > >> > > > > > > >>  >> >>
> > 3A+Handle+disk+failure+for+
> > > > > JBOD>:
> > > > > > > > > > > > > > >> > > > > > > >>  >> >> > Handle disk failure
> for
> > > > JBOD.
> > > > > > They
> > > > > > > > are
> > > > > > > > > > > > needed
> > > > > > > > > > > > > in
> > > > > > > > > > > > > > >> > order
> > > > > > > > > > > > > > >> > > to
> > > > > > > > > > > > > > >> > > > > > > support
> > > > > > > > > > > > > > >> > > > > > > >>  >> JBOD in
> > > > > > > > > > > > > > >> > > > > > > >>  >> >> > Kafka. Please help
> > review
> > > > the
> > > > > > KIP.
> > > > > > > > You
> > > > > > > > > > > > > feedback
> > > > > > > > > > > > > > is
> > > > > > > > > > > > > > >> > > > > > appreciated!
> > > > > > > > > > > > > > >> > > > > > > >>  >> >> >
> > > > > > > > > > > > > > >> > > > > > > >>  >> >> > Thanks,
> > > > > > > > > > > > > > >> > > > > > > >>  >> >> > Dong
> > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to