Hi, Tom, Thanks for the KIP. A few minor comments below.
1. Implementation wise, the broker handles adding partitions differently from changing replica assignment. For the former, we directly update the topic path in ZK with the new partitions. For the latter, we write the new partition reassignment in the partition reassignment path. Changing the replication factor is handled in the same way as changing replica assignment. So, it would be useful to document how the broker handles these different cases accordingly. I think it's simpler to just allow one change (partition, replication factor, or replica assignment) in a request. 2. Currently, we only allow adding partitions. We probably want to document the restriction in the api. 3. It's not very clear to me what status_time in ReplicaStatusResponse means. Jun On Fri, Aug 4, 2017 at 10:04 AM, Dong Lin <lindon...@gmail.com> wrote: > Hey Tom, > > Thanks for your reply. Here are my thoughts: > > 1) I think the DescribeDirsResponse can be used by AdminClient to query the > lag of follower replica as well. Here is how it works: > > - AdminClient sends DescribeDirsRequest to both the leader and the follower > of the partition. > - DescribeDirsResponse from both leader and follower shows the LEO of the > leader replica and the follower replica. > - Lag of the follower replica is the difference in the LEO between leader > and follower. > > In comparison to ReplicaStatusRequest, DescribeDirsRequest needs to be send > to each replica of the partition whereas ReplicaStatusRequest only needs to > be sent to the leader of the partition. It doesn't seem to make much > difference though. In practice we probably want to query the replica lag of > many partitions and AminClient needs to send exactly one request to each > broker with either solution. Does this make sense? > > > 2) KIP-179 proposes to add the following AdminClient API: > > alterTopics(Collection<AlteredTopic> alteredTopics, AlterTopicsOptions > options) > > Where AlteredTopic includes the following fields > > AlteredTopic(String name, int numPartitions, int replicationFactor, > Map<Integer,List<Integer>> replicasAssignment) > > I have two comments on this: > > - The information in AlteredTopic seems a bit redundant. Both numPartitions > and replicationFactor can be derived from replicasAssignment. I think we > can probably just use the following API in AdminClient instead: > > AlterTopicsResult alterTopics(Map<TopicPartition, List<Integer>> > partitionAssignment, AlterTopicsOptions options) > > - Do you think "reassignPartitions" may be a better name than > "alterTopics"? This is more consistent with the existing name used in > kafka-reassign-partitions.sh and I also find it to be more accurate. On the > other hand, alterTopics seems to suggest that we can also alter topic > config. > > > > Thanks, > Dong > > On Fri, Aug 4, 2017 at 1:03 AM, Tom Bentley <t.j.bent...@gmail.com> wrote: > > > Hi Dong, > > > > Thanks for your reply. > > > > You're right that your DescribeDirsResponse contains appropriate data. > The > > comment about the log_end_offset in the KIP says "Enable user to track > > movement progress by comparing LEO of the *.log and *.move". That makes > me > > wonder whether this would only work for replica movement on the same > > broker. In the case of reassigning partitions between brokers it's only > > really the leader for the partition that knows the max LEO of the ISR and > > the LEO of the target broker. Maybe the comment is misleading and it > would > > be the leader doing the same thing in both cases. Can you clarify this? > > > > At a conceptual level there's a lot of similarity between KIP-113 and > > KIP-179 because they're both about moving replicas around. Concretely > > though, ChangeReplicaDirRequest assumes the movement is on the same > broker, > > while the equivalent APIs in KIP-179 (whichever alternative) lack the > > notion of disks. I wonder if a unified API would be better or not. > > Operationally, the reasons for changing replicas between brokers are > likely > > to be motivated by balancing load across the cluster, or adding/removing > > brokers. But the JBOD use case has different motivations. So I suspect > it's > > OK that the APIs are separate, but I'd love to hear what others think. > > > > I think you're right about TopicPartitionReplica. At the protocol level > we > > could group topics and partitions together so avoid having the same topic > > name multiple times when querying for the status of all the partitions > of a > > topic. > > > > Thanks again for taking the time to respond. > > > > Tom > > > > On 3 August 2017 at 23:07, Dong Lin <lindon...@gmail.com> wrote: > > > > > Hey Tom, > > > > > > Thanks for the KIP. It seems that the prior discussion in this thread > has > > > focused on reassigning partitions (or AlterTopics). I haven't looked > into > > > this yet. I have two comments on the replicaStatus() API and the > > > ReplicaStatusRequest: > > > > > > - It seems that the use-case for ReplicaStatusRequest is covered by > > > the DescribeDirsRequest introduced in KIP-113 > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > 113%3A+Support+replicas+movement+between+log+directories>. > > > DescribeDirsResponse contains the logEndOffset for the specified > > > partitions. The the lag of the follower replica can be derived as the > > > difference between leader's LEO and follower's LEO. Do you think we can > > > simply use DescribeDirsRequest without introducing > ReplicaStatusRequest? > > > > > > - According to the current KIP, replicaStatus() takes a list of > > partitions > > > as input. And its output maps the partition to a list of replica > status. > > Do > > > you think it would be better to have a new class called > > > TopicPartitionReplica, which identifies the topic, partition and the > > > brokerId. replicaStatus can then take a list of TopicPartitionReplica > as > > > input. And its output maps the replica to replica status. The latter > API > > > seems simpler and also matches the method name better. What do you > think? > > > > > > Thanks, > > > Dong > > > > > > > > > > > > On Wed, Aug 2, 2017 at 5:12 AM, Tom Bentley <t.j.bent...@gmail.com> > > wrote: > > > > > > > Hi again Ismael, > > > > > > > > > > > > 1. It's worth emphasising that reassigning partitions is a different > > > > >> process than what happens when a topic is created, so not sure > > trying > > > to > > > > >> make it symmetric is beneficial. In addition to what was already > > > > >> discussed, > > > > >> one should also enable replication throttling before moving the > > data. > > > > > > > > > > > > > > > Thanks for your responses.The only advantage I can see from having > > > > > separate APIs for partition count change vs. the other changes is > > that > > > we > > > > > expect the former to return synchronously, and the latter to > > (usually) > > > > > return asynchronously. Lumping them into the same API forces > clients > > of > > > > > that API to code for the two possible cases. Is this the particular > > > > concern > > > > > you had in mind, or do you have others? > > > > > > > > > > > > > Just to help see what this looks like I've created another version of > > > > KIP-179 ( > > > > https://cwiki.apache.org/confluence/display/KAFKA/Copy+ > > > > of+KIP-179+-+Change+ReassignPartitionsCommand+to+use+AdminClient > > > > ). > > > > > > > > The AdminClient APIs are more type safe (harder to misuse). The > > > > asynchronous nature of the alterReplicationFactors() and > > > > reassignPartitions() methods aren't really apparent (you'd still have > > to > > > > read the javadocs to know you had to call replicaStatus() to > determine > > > > completion), but it's still better that the lumped-together API > because > > > the > > > > methods are _consistently_ async. The Protocol APIs are slightly > nicer > > > too, > > > > in that INVALID_REQUEST is less commonly returned. Another benefit is > > it > > > > would allow for these different alteration APIs to be modified > > > > independently in the future, should that necessary. > > > > > > > > What do others think? > > > > > > > > You're right that the proposal doesn't cover setting and clearing a > > > > > throttle, and it should. I will study the code about how this works > > > > before > > > > > posting back about that. > > > > > > > > > > > > > AFAICS from the code the throttle is simply a dynamic config of the > > > broker. > > > > We already have the AdminClient.alterConfigs API for setting this, so > > the > > > > refactoring of kafka-reassign-partitions.sh could just use that > > existing > > > > API. We could still achieve your objective of setting the throttle > > before > > > > reassignment by making the --throttle option mandatory when --execute > > was > > > > present. Or did you have something else in mind? > > > > > > > > Thanks, > > > > > > > > Tom > > > > > > > > > >