Hi all, I've updated the KIP as follows:
* remove the APIs supporting progress reporting in favour of the APIs being implemented in KIP-113. * added some APIs to cover the existing functionality around throttling inter-broker transfers, which was previously a TODO. To respond to Colin's suggestion: > TopicAlterationResult alterTopics(TopicAlterationOptions options, > Map<String, TopicAlteration> alters) > > PartitionCountAlteration(int numPartitions) implements TopicAlteration > > ReplicationFactorAlteration(int repl) implements TopicAlteration > > ReassignPartitionsAlteration(...) implements TopicAlteration That is a workable alternative to providing 3 separate methods on the AdminClient, but I don't see why it is objectively better. I don't see clients commonly needing to do a mixture of alterations, and it assumes that the options make sense for all alterations. Cheers, Tom On 22 August 2017 at 23:49, Colin McCabe <cmcc...@apache.org> wrote: > On Wed, Aug 9, 2017, at 07:17, Tom Bentley wrote: > > Hi Dong and Jun, > > > > Thanks again for your input in this discussion and on KIP-113. It's > > difficult that discussion is split between this thread and the one for > > KIP-113, but I'll try to respond on this thread to questions asked on > > this > > thread. > > > > It seems there is some consensus that the alterTopic() API is the wrong > > thing, and it would make more sense to separate the different kinds of > > alteration into separate APIs. It seems to me there is then a choice. > > > > 1. Have separate alterPartitionCount(), alterReplicationFactor() and > > reassignPartitions() methods. This would most closely match the > > facilities > > currently offered by kafka-alter-topics and kafka-reassign-partitions. > > 2. Just provide a reassignPartitions() method and infer from the shape of > > the data passed to that that a change in replication factor and/or > > partition count is required, as suggested by Dong. > > > > The choice we make here is also relevant to KIP-113 and KIP-178. By > > choosing (1) we can change the replication factor or partition count > > without providing an assignment and therefore are necessarily requiring > > the > > controller to make a decision for us about which broker (and, for 113, > > which log directory and thus which disk) the replica should be reside. > > There is then the matter of what criteria the controller should use to > > make > > that decision (a decision is also required on topic creation of course, > > but > > I'll try not to go there right now). > > Hmm. One approach would be to have something like > > > TopicAlterationResult alterTopics(TopicAlterationOptions options, > > Map<String, TopicAlteration> alters) > > > > PartitionCountAlteration(int numPartitions) implements TopicAlteration > > > > ReplicationFactorAlteration(int repl) implements TopicAlteration > > > > ReassignPartitionsAlteration(...) implements TopicAlteration > > > > > > Choosing (2), on the other hand, forces us to make an assignment, and > > currently the AdminClient doesn't provide the APIs necessary to make a > > very > > informed decision. To do the job properly we'd need APIs to enumerate the > > permissible log directories on each broker, know the current disk usage > > etc. These just don't exist today, and I think it would be a lot of work > > to > > bite off to specify them. We've already got three KIPs on the go. > > > > So, for the moment, I think we have to choose 1, for pragmatic reasons. > > There is nothing to stop us deprecating alterPartitionCount() and > > alterReplicationFactor() and moving to (2) at a later date. > > Yeah, the API should be pragmatic. An API that doesn't let us do what > we want to do (like let Kafka assign us a new replica on the least > loaded node) is not good. > > > > > To answer a specific questions of Jun's: > > > > 3. It's not very clear to me what status_time in ReplicaStatusResponse > > > means. > > > > > > It corresponds to the ReplicaStatus.statusTime, and the idea was it was > > the > > epoch offset at which the controllers notion of the lag was current > > (which > > I think is the epoch offset of the last FetchRequest from the follower). > > At > > one point I was considering some other ways of determining progress and > > completion, and TBH I think it was more pertinent to those rejected > > alternatives. > > > > I've already made some changes to KIP-179. As suggested in the thread for > > KIP-113, I will be changing some more since I think we can > > DescribeDirsRequest > > instead of ReplicaStatusResponse to implement the --progress option. > > That makes sense. Whatever response is used, it would be nice to have > an error message in addition to an error code. > > best, > Colin > > > > > Cheers, > > > > Tom > > > > > > > > On 9 August 2017 at 02:28, Jun Rao <j...@confluent.io> wrote: > > > > > Hi, Tom, > > > > > > Thanks for the KIP. A few minor comments below. > > > > > > 1. Implementation wise, the broker handles adding partitions > differently > > > from changing replica assignment. For the former, we directly update > the > > > topic path in ZK with the new partitions. For the latter, we write the > new > > > partition reassignment in the partition reassignment path. Changing the > > > replication factor is handled in the same way as changing replica > > > assignment. So, it would be useful to document how the broker handles > these > > > different cases accordingly. I think it's simpler to just allow one > change > > > (partition, replication factor, or replica assignment) in a request. > > > > > > 2. Currently, we only allow adding partitions. We probably want to > document > > > the restriction in the api. > > > > > > 3. It's not very clear to me what status_time in ReplicaStatusResponse > > > means. > > > > > > Jun > > > > > > > > > > > > On Fri, Aug 4, 2017 at 10:04 AM, Dong Lin <lindon...@gmail.com> wrote: > > > > > > > Hey Tom, > > > > > > > > Thanks for your reply. Here are my thoughts: > > > > > > > > 1) I think the DescribeDirsResponse can be used by AdminClient to > query > > > the > > > > lag of follower replica as well. Here is how it works: > > > > > > > > - AdminClient sends DescribeDirsRequest to both the leader and the > > > follower > > > > of the partition. > > > > - DescribeDirsResponse from both leader and follower shows the LEO > of the > > > > leader replica and the follower replica. > > > > - Lag of the follower replica is the difference in the LEO between > leader > > > > and follower. > > > > > > > > In comparison to ReplicaStatusRequest, DescribeDirsRequest needs to > be > > > send > > > > to each replica of the partition whereas ReplicaStatusRequest only > needs > > > to > > > > be sent to the leader of the partition. It doesn't seem to make much > > > > difference though. In practice we probably want to query the replica > lag > > > of > > > > many partitions and AminClient needs to send exactly one request to > each > > > > broker with either solution. Does this make sense? > > > > > > > > > > > > 2) KIP-179 proposes to add the following AdminClient API: > > > > > > > > alterTopics(Collection<AlteredTopic> alteredTopics, > AlterTopicsOptions > > > > options) > > > > > > > > Where AlteredTopic includes the following fields > > > > > > > > AlteredTopic(String name, int numPartitions, int replicationFactor, > > > > Map<Integer,List<Integer>> replicasAssignment) > > > > > > > > I have two comments on this: > > > > > > > > - The information in AlteredTopic seems a bit redundant. Both > > > numPartitions > > > > and replicationFactor can be derived from replicasAssignment. I > think we > > > > can probably just use the following API in AdminClient instead: > > > > > > > > AlterTopicsResult alterTopics(Map<TopicPartition, List<Integer>> > > > > partitionAssignment, AlterTopicsOptions options) > > > > > > > > - Do you think "reassignPartitions" may be a better name than > > > > "alterTopics"? This is more consistent with the existing name used in > > > > kafka-reassign-partitions.sh and I also find it to be more accurate. > On > > > the > > > > other hand, alterTopics seems to suggest that we can also alter topic > > > > config. > > > > > > > > > > > > > > > > Thanks, > > > > Dong > > > > > > > > On Fri, Aug 4, 2017 at 1:03 AM, Tom Bentley <t.j.bent...@gmail.com> > > > wrote: > > > > > > > > > Hi Dong, > > > > > > > > > > Thanks for your reply. > > > > > > > > > > You're right that your DescribeDirsResponse contains appropriate > data. > > > > The > > > > > comment about the log_end_offset in the KIP says "Enable user to > track > > > > > movement progress by comparing LEO of the *.log and *.move". That > makes > > > > me > > > > > wonder whether this would only work for replica movement on the > same > > > > > broker. In the case of reassigning partitions between brokers it's > only > > > > > really the leader for the partition that knows the max LEO of the > ISR > > > and > > > > > the LEO of the target broker. Maybe the comment is misleading and > it > > > > would > > > > > be the leader doing the same thing in both cases. Can you clarify > this? > > > > > > > > > > > > > At a conceptual level there's a lot of similarity between KIP-113 > and > > > > > KIP-179 because they're both about moving replicas around. > Concretely > > > > > though, ChangeReplicaDirRequest assumes the movement is on the same > > > > broker, > > > > > while the equivalent APIs in KIP-179 (whichever alternative) lack > the > > > > > notion of disks. I wonder if a unified API would be better or not. > > > > > Operationally, the reasons for changing replicas between brokers > are > > > > likely > > > > > to be motivated by balancing load across the cluster, or > > > adding/removing > > > > > brokers. But the JBOD use case has different motivations. So I > suspect > > > > it's > > > > > OK that the APIs are separate, but I'd love to hear what others > think. > > > > > > > > > > > > > I think you're right about TopicPartitionReplica. At the protocol > level > > > > we > > > > > could group topics and partitions together so avoid having the same > > > topic > > > > > name multiple times when querying for the status of all the > partitions > > > > of a > > > > > topic. > > > > > > > > > > Thanks again for taking the time to respond. > > > > > > > > > > Tom > > > > > > > > > > On 3 August 2017 at 23:07, Dong Lin <lindon...@gmail.com> wrote: > > > > > > > > > > > Hey Tom, > > > > > > > > > > > > Thanks for the KIP. It seems that the prior discussion in this > thread > > > > has > > > > > > focused on reassigning partitions (or AlterTopics). I haven't > looked > > > > into > > > > > > this yet. I have two comments on the replicaStatus() API and the > > > > > > ReplicaStatusRequest: > > > > > > > > > > > > - It seems that the use-case for ReplicaStatusRequest is > covered by > > > > > > the DescribeDirsRequest introduced in KIP-113 > > > > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > > > > 113%3A+Support+replicas+movement+between+log+directories>. > > > > > > DescribeDirsResponse contains the logEndOffset for the specified > > > > > > partitions. The the lag of the follower replica can be derived > as the > > > > > > difference between leader's LEO and follower's LEO. Do you think > we > > > can > > > > > > simply use DescribeDirsRequest without introducing > > > > ReplicaStatusRequest? > > > > > > > > > > > > - According to the current KIP, replicaStatus() takes a list of > > > > > partitions > > > > > > as input. And its output maps the partition to a list of replica > > > > status. > > > > > Do > > > > > > you think it would be better to have a new class called > > > > > > TopicPartitionReplica, which identifies the topic, partition and > the > > > > > > brokerId. replicaStatus can then take a list of > TopicPartitionReplica > > > > as > > > > > > input. And its output maps the replica to replica status. The > latter > > > > API > > > > > > seems simpler and also matches the method name better. What do > you > > > > think? > > > > > > > > > > > > Thanks, > > > > > > Dong > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Aug 2, 2017 at 5:12 AM, Tom Bentley < > t.j.bent...@gmail.com> > > > > > wrote: > > > > > > > > > > > > > Hi again Ismael, > > > > > > > > > > > > > > > > > > > > > 1. It's worth emphasising that reassigning partitions is a > > > different > > > > > > > >> process than what happens when a topic is created, so not > sure > > > > > trying > > > > > > to > > > > > > > >> make it symmetric is beneficial. In addition to what was > already > > > > > > > >> discussed, > > > > > > > >> one should also enable replication throttling before moving > the > > > > > data. > > > > > > > > > > > > > > > > > > > > > > > > Thanks for your responses.The only advantage I can see from > > > having > > > > > > > > separate APIs for partition count change vs. the other > changes is > > > > > that > > > > > > we > > > > > > > > expect the former to return synchronously, and the latter to > > > > > (usually) > > > > > > > > return asynchronously. Lumping them into the same API forces > > > > clients > > > > > of > > > > > > > > that API to code for the two possible cases. Is this the > > > particular > > > > > > > concern > > > > > > > > you had in mind, or do you have others? > > > > > > > > > > > > > > > > > > > > > > Just to help see what this looks like I've created another > version > > > of > > > > > > > KIP-179 ( > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Copy+ > > > > > > > of+KIP-179+-+Change+ReassignPartitionsCommand+to+ > use+AdminClient > > > > > > > ). > > > > > > > > > > > > > > The AdminClient APIs are more type safe (harder to misuse). The > > > > > > > asynchronous nature of the alterReplicationFactors() and > > > > > > > reassignPartitions() methods aren't really apparent (you'd > still > > > have > > > > > to > > > > > > > read the javadocs to know you had to call replicaStatus() to > > > > determine > > > > > > > completion), but it's still better that the lumped-together API > > > > because > > > > > > the > > > > > > > methods are _consistently_ async. The Protocol APIs are > slightly > > > > nicer > > > > > > too, > > > > > > > in that INVALID_REQUEST is less commonly returned. Another > benefit > > > is > > > > > it > > > > > > > would allow for these different alteration APIs to be modified > > > > > > > independently in the future, should that necessary. > > > > > > > > > > > > > > What do others think? > > > > > > > > > > > > > > You're right that the proposal doesn't cover setting and > clearing a > > > > > > > > throttle, and it should. I will study the code about how this > > > works > > > > > > > before > > > > > > > > posting back about that. > > > > > > > > > > > > > > > > > > > > > > AFAICS from the code the throttle is simply a dynamic config > of the > > > > > > broker. > > > > > > > We already have the AdminClient.alterConfigs API for setting > this, > > > so > > > > > the > > > > > > > refactoring of kafka-reassign-partitions.sh could just use that > > > > > existing > > > > > > > API. We could still achieve your objective of setting the > throttle > > > > > before > > > > > > > reassignment by making the --throttle option mandatory when > > > --execute > > > > > was > > > > > > > present. Or did you have something else in mind? > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > Tom > > > > > > > > > > > > > > > > > > > > > > > > > >