Hi, Dong,

Thanks for the reply.

1.3 So the thread gets the lock, checks if caught up and releases the lock
if not? Then, in the case when there is continuous incoming data, the
thread may never get a chance to swap. One way to address this is when the
thread is getting really close in catching up, just hold onto the lock
until the thread fully catches up.

2.3 So, you are saying that the partition reassignment tool can first send
a ChangeReplicaDirRequest to relevant brokers to establish the log dir for
replicas not created yet, then trigger the partition movement across
brokers through the controller? That's actually a good idea. Then, we can
just leave LeaderAndIsrRequest as it is. Another thing related to
ChangeReplicaDirRequest.
Since this request may take long to complete, I am not sure if we should
wait for the movement to complete before respond. While waiting for the
movement to complete, the idle connection may be killed or the client may
be gone already. An alternative is to return immediately and add a new
request like CheckReplicaDirRequest to see if the movement has completed.
The tool can take advantage of that to check the status.

Thanks,

Jun



On Wed, Mar 8, 2017 at 6:21 PM, Dong Lin <lindon...@gmail.com> wrote:

> Hey Jun,
>
> Thanks for the detailed explanation. I will use the separate thread pool to
> move replica between log directories. I will let you know when the KIP has
> been updated to use a separate thread pool.
>
> Here is my response to your other questions:
>
> 1.3 My idea is that the ReplicaMoveThread that moves data should get the
> lock before checking whether the replica in the destination log directory
> has caught up. If the new replica has caught up, then the ReplicaMoveThread
> should swaps the replica while it is still holding the lock. The
> ReplicaFetcherThread or RequestHandlerThread will not be able to append
> data to the replica in the source replica during this period because they
> can not get the lock. Does this address the problem?
>
> 2.3 I get your point that we want to keep controller simpler. If admin tool
> can send ChangeReplicaDirRequest to move data within a broker, then
> controller probably doesn't even need to include log directory path in the
> LeaderAndIsrRequest. How about this: controller will only deal with
> reassignment across brokers as it does now. If user specified destination
> replica for any disk, the admin tool will send ChangeReplicaDirRequest and
> wait for response from broker to confirm that all replicas have been moved
> to the destination log direcotry. The broker will put
> ChangeReplicaDirRequset in a purgatory and respond either when the movement
> is completed or when the request has timed-out.
>
> 4. I agree that we can expose these metrics via JMX. But I am not sure if
> it can be obtained easily with good performance using either existing tools
> or new script in kafka. I will ask SREs for their opinion.
>
> Thanks,
> Dong
>
>
>
>
>
>
>
>
> On Wed, Mar 8, 2017 at 1:24 PM, Jun Rao <j...@confluent.io> wrote:
>
> > Hi, Dong,
> >
> > Thanks for the updated KIP. A few more comments below.
> >
> > 1.1 and 1.2: I am still not sure there is enough benefit of reusing
> > ReplicaFetchThread
> > to move data across disks.
> > (a) A big part of ReplicaFetchThread is to deal with issuing and tracking
> > fetch requests. So, it doesn't feel that we get much from reusing
> > ReplicaFetchThread
> > only to disable the fetching part.
> > (b) The leader replica has no ReplicaFetchThread to start with. It feels
> > weird to start one just for intra broker data movement.
> > (c) The ReplicaFetchThread is per broker. Intuitively, the number of
> > threads doing intra broker data movement should be related to the number
> of
> > disks in the broker, not the number of brokers in the cluster.
> > (d) If the destination disk fails, we want to stop the intra broker data
> > movement, but want to continue inter broker replication. So, logically,
> it
> > seems it's better to separate out the two.
> > (e) I am also not sure if we should reuse the existing throttling for
> > replication. It's designed to handle traffic across brokers and the
> > delaying is done in the fetch request. So, if we are not doing
> > fetching in ReplicaFetchThread,
> > I am not sure the existing throttling is effective. Also, when specifying
> > the throttling of moving data across disks, it seems the user shouldn't
> > care about whether a replica is a leader or a follower. Reusing the
> > existing throttling config name will be awkward in this regard.
> > (f) It seems it's simpler and more consistent to use a separate thread
> pool
> > for local data movement (for both leader and follower replicas). This
> > process can then be configured (e.g. number of threads, etc) and
> throttled
> > independently.
> >
> > 1.3 Yes, we will need some synchronization there. So, if the movement
> > thread catches up, gets the lock to do the swap, but realizes that new
> data
> > is added, it has to continue catching up while holding the lock?
> >
> > 2.3 The benefit of including the desired log directory in
> > LeaderAndIsrRequest
> > during partition reassignment is that the controller doesn't need to
> track
> > the progress for disk movement. So, you don't need the additional
> > BrokerDirStateUpdateRequest. Then the controller never needs to issue
> > ChangeReplicaDirRequest.
> > Only the admin tool will issue ChangeReplicaDirRequest to move data
> within
> > a broker. I agree that this makes LeaderAndIsrRequest more complicated,
> but
> > that seems simpler than changing the controller to track additional
> states
> > during partition reassignment.
> >
> > 4. We want to make a decision on how to expose the stats. So far, we are
> > exposing stats like the individual log size as JMX. So, one way is to
> just
> > add new jmx to expose the log directory of individual replicas.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Thu, Mar 2, 2017 at 11:18 PM, Dong Lin <lindon...@gmail.com> wrote:
> >
> > > Hey Jun,
> > >
> > > Thanks for all the comments! Please see my answer below. I have updated
> > the
> > > KIP to address most of the questions and make the KIP easier to
> > understand.
> > >
> > > Thanks,
> > > Dong
> > >
> > > On Thu, Mar 2, 2017 at 9:35 AM, Jun Rao <j...@confluent.io> wrote:
> > >
> > > > Hi, Dong,
> > > >
> > > > Thanks for the KIP. A few comments below.
> > > >
> > > > 1. For moving data across directories
> > > > 1.1 I am not sure why we want to use ReplicaFetcherThread to move
> data
> > > > around in the leader. ReplicaFetchThread fetches data from socket.
> For
> > > > moving data locally, it seems that we want to avoid the socket
> > overhead.
> > > >
> > >
> > > The purpose of using ReplicaFetchThread is to re-use existing thread
> > > instead of creating more threads and make our thread model more
> complex.
> > It
> > > seems like a nature choice for copying data between disks since it is
> > > similar to copying data between brokers. Another reason is that if the
> > > replica to be moved is a follower, we don't need lock to swap replicas
> > when
> > > destination replica has caught up, since the same thread which is
> > fetching
> > > data from leader will swap the replica.
> > >
> > > The ReplicaFetchThread will not incur socket overhead while copying
> data
> > > between disks. It will read directly from source disk (as we do when
> > > processing FetchRequest) and write to destination disk (as we do when
> > > processing ProduceRequest).
> > >
> > >
> > > > 1.2 I am also not sure about moving data in the ReplicaFetcherThread
> in
> > > the
> > > > follower. For example, I am not sure setting replica.fetch.max.wait
> to
> > 0
> > > >  is ideal. It may not always be effective since a fetch request in
> the
> > > > ReplicaFetcherThread could be arbitrarily delayed due to replication
> > > > throttling on the leader. In general, the data movement logic across
> > > disks
> > > > seems different from that in ReplicaFetcherThread. So, I am not sure
> > why
> > > > they need to be coupled.
> > > >
> > >
> > > While it may not be the most efficient way to copy data between local
> > > disks, it will be at least as efficient as copying data from leader to
> > the
> > > destination disk. The expected goal of KIP-113 is to enable data
> movement
> > > between disks with no less efficiency than what we do now when moving
> > data
> > > between brokers. I think we can optimize its performance using separate
> > > thread if the performance is not good enough.
> > >
> > >
> > > > 1.3 Could you add a bit more details on how we swap the replicas when
> > the
> > > > new ones are fully caught up? For example, what happens when the new
> > > > replica in the new log directory is caught up, but when we want to do
> > the
> > > > swap, some new data has arrived?
> > > >
> > >
> > > If the replica is a leader, then ReplicaFetcherThread will perform the
> > > replacement. Proper lock is needed to prevent KafkaRequestHandler from
> > > appending data to the topicPartition.log on the source disks before
> this
> > > replacement is completed by ReplicaFetcherThread.
> > >
> > > If the replica is a follower, because the same ReplicaFetchThread which
> > > fetches data from leader will also swap the replica , no lock is
> needed.
> > >
> > > I have updated the KIP to specify both more explicitly.
> > >
> > >
> > >
> > > > 1.4 Do we need to do the .move at the log segment level or could we
> > just
> > > do
> > > > that at the replica directory level? Renaming just a directory is
> much
> > > > faster than renaming the log segments.
> > > >
> > >
> > > Great point. I have updated the KIP to rename the log directory
> instead.
> > >
> > >
> > > > 1.5 Could you also describe a bit what happens when either the source
> > or
> > > > the target log directory fails while the data moving is in progress?
> > > >
> > >
> > > If source log directory fails, then the replica movement will stop and
> > the
> > > source replica is marked offline. If destination log directory fails,
> > then
> > > the replica movement will stop. I have updated the KIP to clarify this.
> > >
> > >
> > > >
> > > > 2. For partition reassignment.
> > > > 2.1 I am not sure if the controller can block on
> > ChangeReplicaDirRequest.
> > > > Data movement may take a long time to complete. If there is an
> > > outstanding
> > > > request from the controller to a broker, that broker won't be able to
> > > > process any new request from the controller. So if another event
> (e.g.
> > > > broker failure) happens when the data movement is in progress,
> > subsequent
> > > > LeaderAnIsrRequest will be delayed.
> > > >
> > >
> > > Yeah good point. I missed the fact that there is be only one inflight
> > > request from controller to broker.
> > >
> > > How about I add a request, e.g. BrokerDirStateUpdateRequest, which maps
> > > topicPartition to log directory and can be sent from broker to
> controller
> > > to indicate completion?
> > >
> > >
> > >
> > > > 2.2 in the KIP, the partition reassignment tool is also used for
> cases
> > > > where an admin just wants to balance the existing data across log
> > > > directories in the broker. In this case, it seems that it's over
> > killing
> > > to
> > > > have the process go through the controller. A simpler approach is to
> > > issue
> > > > an RPC request to the broker directly.
> > > >
> > >
> > > I agree we can optimize this case. It is just that we have to add new
> > logic
> > > or code path to handle a scenario that is already covered by the more
> > > complicated scenario. I will add it to the KIP.
> > >
> > >
> > > > 2.3 When using the partition reassignment tool to move replicas
> across
> > > > brokers, it make sense to be able to specify the log directory of the
> > > newly
> > > > created replicas. The KIP does that in two separate requests
> > > > ChangeReplicaDirRequest and LeaderAndIsrRequest, and tracks the
> > progress
> > > of
> > > > each independently. An alternative is to do that just in
> > > > LeaderAndIsrRequest.
> > > > That way, the new replicas will be created in the right log dir in
> the
> > > > first place and the controller just needs to track the progress of
> > > > partition reassignment in the current way.
> > > >
> > >
> > > I agree it is better to use one request instead of two to request
> replica
> > > movement between disks. But I think the performance advantage of doing
> so
> > > is negligible because we trigger replica assignment much less than all
> > > other kinds of events in the Kafka cluster. I am not sure that the
> > benefit
> > > of doing this is worth the effort to add an optional string field in
> the
> > > LeaderAndIsrRequest. Also if we add this optional field in the
> > > LeaderAndIsrRequest, we probably want to remove ChangeReplicaDirRequest
> > to
> > > avoid having two requests doing the same thing. But it means user
> script
> > > can not send request directly to the broker to trigger replica movement
> > > between log directories.
> > >
> > > I will do it if you are strong about this optimzation.
> > >
> > >
> > > >
> > > > 3. /admin/reassign_partitions: Including the log dir in every replica
> > may
> > > > not be efficient. We could include a list of log directories and
> > > reference
> > > > the index of the log directory in each replica.
> > > >
> > >
> > > Good point. I have updated the KIP to use this solution.
> > >
> > >
> > > >
> > > > 4. DescribeDirsRequest: The stats in the request are already
> available
> > > from
> > > > JMX. Do we need the new request?
> > > >
> > >
> > > Does JMX also include the state (i.e. offline or online) of each log
> > > directory and the log directory of each replica? If not, then maybe we
> > > still need DescribeDirsRequest?
> > >
> > >
> > > >
> > > > 5. We want to be consistent on ChangeReplicaDirRequest vs
> > > > ChangeReplicaRequest.
> > > >
> > >
> > > I think ChangeReplicaRequest and ChangeReplicaResponse is my typo.
> Sorry,
> > > they are fixed now.
> > >
> > >
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Fri, Feb 3, 2017 at 6:19 PM, Dong Lin <lindon...@gmail.com>
> wrote:
> > > >
> > > > > Hey ALexey,
> > > > >
> > > > > Thanks for all the comments!
> > > > >
> > > > > I have updated the KIP to specify how we enforce quota. I also
> > updated
> > > > the
> > > > > "The thread model and broker logic for moving replica data between
> > log
> > > > > directories" to make it easier to read. You can find the exact
> change
> > > > here
> > > > > <https://cwiki.apache.org/confluence/pages/diffpagesbyversio
> > > > > n.action?pageId=67638408&selectedPageVersions=5&selectedPage
> > > Versions=6>.
> > > > > The idea is to use the same replication quota mechanism introduced
> in
> > > > > KIP-73.
> > > > >
> > > > > Thanks,
> > > > > Dong
> > > > >
> > > > >
> > > > >
> > > > > On Wed, Feb 1, 2017 at 2:16 AM, Alexey Ozeritsky <
> > aozerit...@yandex.ru
> > > >
> > > > > wrote:
> > > > >
> > > > > >
> > > > > >
> > > > > > 24.01.2017, 22:03, "Dong Lin" <lindon...@gmail.com>:
> > > > > > > Hey Alexey,
> > > > > > >
> > > > > > > Thanks. I think we agreed that the suggested solution doesn't
> > work
> > > in
> > > > > > > general for kafka users. To answer your questions:
> > > > > > >
> > > > > > > 1. I agree we need quota to rate limit replica movement when a
> > > broker
> > > > > is
> > > > > > > moving a "leader" replica. I will come up with solution,
> probably
> > > > > re-use
> > > > > > > the config of replication quota introduced in KIP-73.
> > > > > > >
> > > > > > > 2. Good point. I agree that this is a problem in general. If is
> > no
> > > > new
> > > > > > data
> > > > > > > on that broker, with current default value of
> > > > > replica.fetch.wait.max.ms
> > > > > > > and replica.fetch.max.bytes, the replica will be moved at only
> 2
> > > MBps
> > > > > > > throughput. I think the solution is for broker to set
> > > > > > > replica.fetch.wait.max.ms to 0 in its FetchRequest if the
> > > > > corresponding
> > > > > > > ReplicaFetcherThread needs to move some replica to another
> disk.
> > > > > > >
> > > > > > > 3. I have updated the KIP to mention that the read size of a
> > given
> > > > > > > partition is configured using replica.fetch.max.bytes when we
> > move
> > > > > > replicas
> > > > > > > between disks.
> > > > > > >
> > > > > > > Please see this
> > > > > > > <https://cwiki.apache.org/confluence/pages/diffpagesbyversio
> > > n.action
> > > > ?
> > > > > > pageId=67638408&selectedPageVersions=4&selectedPageVersions=5>
> > > > > > > for the change of the KIP. I will come up with a solution to
> > > throttle
> > > > > > > replica movement when a broker is moving a "leader" replica.
> > > > > >
> > > > > > Thanks. It looks great.
> > > > > >
> > > > > > >
> > > > > > > On Tue, Jan 24, 2017 at 3:30 AM, Alexey Ozeritsky <
> > > > > aozerit...@yandex.ru>
> > > > > > > wrote:
> > > > > > >
> > > > > > >>  23.01.2017, 22:11, "Dong Lin" <lindon...@gmail.com>:
> > > > > > >>  > Thanks. Please see my comment inline.
> > > > > > >>  >
> > > > > > >>  > On Mon, Jan 23, 2017 at 6:45 AM, Alexey Ozeritsky <
> > > > > > aozerit...@yandex.ru>
> > > > > > >>  > wrote:
> > > > > > >>  >
> > > > > > >>  >> 13.01.2017, 22:29, "Dong Lin" <lindon...@gmail.com>:
> > > > > > >>  >> > Hey Alexey,
> > > > > > >>  >> >
> > > > > > >>  >> > Thanks for your review and the alternative approach.
> Here
> > is
> > > > my
> > > > > > >>  >> > understanding of your patch. kafka's background threads
> > are
> > > > used
> > > > > > to
> > > > > > >>  move
> > > > > > >>  >> > data between replicas. When data movement is triggered,
> > the
> > > > log
> > > > > > will
> > > > > > >>  be
> > > > > > >>  >> > rolled and the new logs will be put in the new
> directory,
> > > and
> > > > > > >>  background
> > > > > > >>  >> > threads will move segment from old directory to new
> > > directory.
> > > > > > >>  >> >
> > > > > > >>  >> > It is important to note that KIP-112 is intended to work
> > > with
> > > > > > >>  KIP-113 to
> > > > > > >>  >> > support JBOD. I think your solution is definitely
> simpler
> > > and
> > > > > > better
> > > > > > >>  >> under
> > > > > > >>  >> > the current kafka implementation that a broker will fail
> > if
> > > > any
> > > > > > disk
> > > > > > >>  >> fails.
> > > > > > >>  >> > But I am not sure if we want to allow broker to run with
> > > > partial
> > > > > > >>  disks
> > > > > > >>  >> > failure. Let's say the a replica is being moved from
> > > > log_dir_old
> > > > > > to
> > > > > > >>  >> > log_dir_new and then log_dir_old stops working due to
> disk
> > > > > > failure.
> > > > > > >>  How
> > > > > > >>  >> > would your existing patch handles it? To make the
> > scenario a
> > > > bit
> > > > > > more
> > > > > > >>  >>
> > > > > > >>  >> We will lose log_dir_old. After broker restart we can read
> > the
> > > > > data
> > > > > > >>  from
> > > > > > >>  >> log_dir_new.
> > > > > > >>  >
> > > > > > >>  > No, you probably can't. This is because the broker doesn't
> > have
> > > > > > *all* the
> > > > > > >>  > data for this partition. For example, say the broker has
> > > > > > >>  > partition_segement_1, partition_segment_50 and
> > > > > partition_segment_100
> > > > > > on
> > > > > > >>  the
> > > > > > >>  > log_dir_old. partition_segment_100, which has the latest
> > data,
> > > > has
> > > > > > been
> > > > > > >>  > moved to log_dir_new, and the log_dir_old fails before
> > > > > > >>  partition_segment_50
> > > > > > >>  > and partition_segment_1 is moved to log_dir_new. When
> broker
> > > > > > re-starts,
> > > > > > >>  it
> > > > > > >>  > won't have partition_segment_50. This causes problem if
> > broker
> > > is
> > > > > > elected
> > > > > > >>  > leader and consumer wants to consume data in the
> > > > > partition_segment_1.
> > > > > > >>
> > > > > > >>  Right.
> > > > > > >>
> > > > > > >>  >
> > > > > > >>  >> > complicated, let's say the broker is shtudown,
> > log_dir_old's
> > > > > disk
> > > > > > >>  fails,
> > > > > > >>  >> > and the broker starts. In this case broker doesn't even
> > know
> > > > if
> > > > > > >>  >> log_dir_new
> > > > > > >>  >> > has all the data needed for this replica. It becomes a
> > > problem
> > > > > if
> > > > > > the
> > > > > > >>  >> > broker is elected leader of this partition in this case.
> > > > > > >>  >>
> > > > > > >>  >> log_dir_new contains the most recent data so we will lose
> > the
> > > > tail
> > > > > > of
> > > > > > >>  >> partition.
> > > > > > >>  >> This is not a big problem for us because we already delete
> > > tails
> > > > > by
> > > > > > >>  hand
> > > > > > >>  >> (see https://issues.apache.org/jira/browse/KAFKA-1712).
> > > > > > >>  >> Also we dont use authomatic leader balancing
> > > > > > >>  (auto.leader.rebalance.enable=false),
> > > > > > >>  >> so this partition becomes the leader with a low
> probability.
> > > > > > >>  >> I think my patch can be modified to prohibit the selection
> > of
> > > > the
> > > > > > >>  leader
> > > > > > >>  >> until the partition does not move completely.
> > > > > > >>  >
> > > > > > >>  > I guess you are saying that you have deleted the tails by
> > hand
> > > in
> > > > > > your
> > > > > > >>  own
> > > > > > >>  > kafka branch. But KAFKA-1712 is not accepted into Kafka
> trunk
> > > > and I
> > > > > > am
> > > > > > >>  not
> > > > > > >>
> > > > > > >>  No. We just modify segments mtime by cron job. This works
> with
> > > > > vanilla
> > > > > > >>  kafka.
> > > > > > >>
> > > > > > >>  > sure if it is the right solution. How would this solution
> > > address
> > > > > the
> > > > > > >>  > problem mentioned above?
> > > > > > >>
> > > > > > >>  If you need only fresh data and if you remove old data by
> hands
> > > > this
> > > > > is
> > > > > > >>  not a problem. But in general case
> > > > > > >>  this is a problem of course.
> > > > > > >>
> > > > > > >>  >
> > > > > > >>  > BTW, I am not sure the solution mentioned in KAFKA-1712 is
> > the
> > > > > right
> > > > > > way
> > > > > > >>  to
> > > > > > >>  > address its problem. Now that we have timestamp in the
> > message
> > > we
> > > > > > can use
> > > > > > >>  > that to delete old segement instead of relying on the log
> > > segment
> > > > > > mtime.
> > > > > > >>  > Just some idea and we don't have to discuss this problem
> > here.
> > > > > > >>  >
> > > > > > >>  >> >
> > > > > > >>  >> > The solution presented in the KIP attempts to handle it
> by
> > > > > > replacing
> > > > > > >>  >> > replica in an atomic version fashion after the log in
> the
> > > new
> > > > > dir
> > > > > > has
> > > > > > >>  >> fully
> > > > > > >>  >> > caught up with the log in the old dir. At at time the
> log
> > > can
> > > > be
> > > > > > >>  >> considered
> > > > > > >>  >> > to exist on only one log directory.
> > > > > > >>  >>
> > > > > > >>  >> As I understand your solution does not cover quotas.
> > > > > > >>  >> What happens if someone starts to transfer 100 partitions
> ?
> > > > > > >>  >
> > > > > > >>  > Good point. Quota can be implemented in the future. It is
> > > > currently
> > > > > > >>  > mentioned as as a potential future improvement in KIP-112
> > > > > > >>  > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 112%3
> > > > > > >>  A+Handle+disk+failure+for+JBOD>.Thanks
> > > > > > >>  > for the reminder. I will move it to KIP-113.
> > > > > > >>  >
> > > > > > >>  >> > If yes, it will read a ByteBufferMessageSet from
> > > > > > topicPartition.log
> > > > > > >>  and
> > > > > > >>  >> append the message set to topicPartition.move
> > > > > > >>  >>
> > > > > > >>  >> i.e. processPartitionData will read data from the
> beginning
> > of
> > > > > > >>  >> topicPartition.log? What is the read size?
> > > > > > >>  >> A ReplicaFetchThread reads many partitions so if one does
> > some
> > > > > > >>  complicated
> > > > > > >>  >> work (= read a lot of data from disk) everything will slow
> > > down.
> > > > > > >>  >> I think read size should not be very big.
> > > > > > >>  >>
> > > > > > >>  >> On the other hand at this point (processPartitionData) one
> > can
> > > > use
> > > > > > only
> > > > > > >>  >> the new data (ByteBufferMessageSet from parameters) and
> wait
> > > > until
> > > > > > >>  >> (topicPartition.move.smallestOffset <=
> > > > > > topicPartition.log.smallestOff
> > > > > > >>  set
> > > > > > >>  >> && topicPartition.log.largestOffset ==
> > > > > > topicPartition.log.largestOffs
> > > > > > >>  et).
> > > > > > >>  >> In this case the write speed to topicPartition.move and
> > > > > > >>  topicPartition.log
> > > > > > >>  >> will be the same so this will allow us to move many
> > partitions
> > > > to
> > > > > > one
> > > > > > >>  disk.
> > > > > > >>  >
> > > > > > >>  > The read size of a given partition is configured
> > > > > > >>  > using replica.fetch.max.bytes, which is the same size used
> by
> > > > > > >>  FetchRequest
> > > > > > >>  > from follower to leader. If the broker is moving a replica
> > for
> > > > > which
> > > > > > it
> > > > > > >>
> > > > > > >>  OK. Could you mention it in KIP?
> > > > > > >>
> > > > > > >>  > acts as a follower, the disk write rate for moving this
> > replica
> > > > is
> > > > > at
> > > > > > >>  most
> > > > > > >>  > the rate it fetches from leader (assume it is catching up
> and
> > > has
> > > > > > >>  > sufficient data to read from leader, which is subject to
> > > > > > round-trip-time
> > > > > > >>  > between itself and the leader. Thus this part if probably
> > fine
> > > > even
> > > > > > >>  without
> > > > > > >>  > quota.
> > > > > > >>
> > > > > > >>  I think there are 2 problems
> > > > > > >>  1. Without speed limiter this will not work good even for 1
> > > > > partition.
> > > > > > In
> > > > > > >>  our production we had a problem so we did the throuput
> limiter:
> > > > > > >>  https://github.com/resetius/kafka/commit/cda31dadb2f135743bf
> > > > > > >>  41083062927886c5ddce1#diff-ffa8861e850121997a534ebdde2929c6R
> > 713
> > > > > > >>
> > > > > > >>  2. I dont understand how it will work in case of big
> > > > > > >>  replica.fetch.wait.max.ms and partition with irregular flow.
> > > > > > >>  For example someone could have replica.fetch.wait.max.ms
> =10mi
> > > nutes
> > > > > and
> > > > > > >>  partition that has very high data flow from 12:00 to 13:00
> and
> > > zero
> > > > > > flow
> > > > > > >>  otherwise.
> > > > > > >>  In this case processPartitionData could be called once per
> > > > 10minutes
> > > > > > so if
> > > > > > >>  we start data moving in 13:01 it will be finished next day.
> > > > > > >>
> > > > > > >>  >
> > > > > > >>  > But ff the broker is moving a replica for which it acts as
> a
> > > > > leader,
> > > > > > as
> > > > > > >>  of
> > > > > > >>  > current KIP the broker will keep reading from log_dir_old
> and
> > > > > append
> > > > > > to
> > > > > > >>  > log_dir_new without having to wait for round-trip-time. We
> > > > probably
> > > > > > need
> > > > > > >>  > quota for this in the future.
> > > > > > >>  >
> > > > > > >>  >> >
> > > > > > >>  >> > And to answer your question, yes topicpartition.log
> refers
> > > to
> > > > > > >>  >> > topic-paritition/segment.log.
> > > > > > >>  >> >
> > > > > > >>  >> > Thanks,
> > > > > > >>  >> > Dong
> > > > > > >>  >> >
> > > > > > >>  >> > On Fri, Jan 13, 2017 at 4:12 AM, Alexey Ozeritsky <
> > > > > > >>  aozerit...@yandex.ru>
> > > > > > >>  >> > wrote:
> > > > > > >>  >> >
> > > > > > >>  >> >> Hi,
> > > > > > >>  >> >>
> > > > > > >>  >> >> We have the similar solution that have been working in
> > > > > production
> > > > > > >>  since
> > > > > > >>  >> >> 2014. You can see it here:
> > https://github.com/resetius/ka
> > > > > > >>  >> >> fka/commit/20658593e246d2184906879defa2e763c4d413fb
> > > > > > >>  >> >> The idea is very simple
> > > > > > >>  >> >> 1. Disk balancer runs in a separate thread inside
> > scheduler
> > > > > pool.
> > > > > > >>  >> >> 2. It does not touch empty partitions
> > > > > > >>  >> >> 3. Before it moves a partition it forcibly creates new
> > > > segment
> > > > > > on a
> > > > > > >>  >> >> destination disk
> > > > > > >>  >> >> 4. It moves segment by segment from new to old.
> > > > > > >>  >> >> 5. Log class works with segments on both disks
> > > > > > >>  >> >>
> > > > > > >>  >> >> Your approach seems too complicated, moreover it means
> > that
> > > > you
> > > > > > >>  have to
> > > > > > >>  >> >> patch different components of the system
> > > > > > >>  >> >> Could you clarify what do you mean by
> topicPartition.log?
> > > Is
> > > > it
> > > > > > >>  >> >> topic-paritition/segment.log ?
> > > > > > >>  >> >>
> > > > > > >>  >> >> 12.01.2017, 21:47, "Dong Lin" <lindon...@gmail.com>:
> > > > > > >>  >> >> > Hi all,
> > > > > > >>  >> >> >
> > > > > > >>  >> >> > We created KIP-113: Support replicas movement between
> > log
> > > > > > >>  >> directories.
> > > > > > >>  >> >> > Please find the KIP wiki in the link
> > > > > > >>  >> >> > *https://cwiki.apache.org/conf
> > > > luence/display/KAFKA/KIP-113%
> > > > > > >>  >> >> 3A+Support+replicas+movement+between+log+directories
> > > > > > >>  >> >> > <https://cwiki.apache.org/conf
> > > > luence/display/KAFKA/KIP-113%
> > > > > > >>  >> >> 3A+Support+replicas+movement+
> between+log+directories>.*
> > > > > > >>  >> >> >
> > > > > > >>  >> >> > This KIP is related to KIP-112
> > > > > > >>  >> >> > <https://cwiki.apache.org/conf
> > > > luence/display/KAFKA/KIP-112%
> > > > > > >>  >> >> 3A+Handle+disk+failure+for+JBOD>:
> > > > > > >>  >> >> > Handle disk failure for JBOD. They are needed in
> order
> > to
> > > > > > support
> > > > > > >>  >> JBOD in
> > > > > > >>  >> >> > Kafka. Please help review the KIP. You feedback is
> > > > > appreciated!
> > > > > > >>  >> >> >
> > > > > > >>  >> >> > Thanks,
> > > > > > >>  >> >> > Dong
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to