bq. What about startPartitionAssignment() ? Make sense. startPartitionReassignment() seems to be better since the API deals with reassignment.
Cheers On Tue, Sep 5, 2017 at 9:39 AM, Tom Bentley <t.j.bent...@gmail.com> wrote: > I've revised this KIP again: > > * Change the alterPartitionCounts() API to support passing an optional > assignment for the new partitions (which is already supported by > kafka-topics.sh). At the same time I didn't want the API to suggest it was > possible to change the existing assignments in the same call (which isn't > supported today). > > * Removed alterReplicationFactor(), since it's really just a special case > of reassignPartitions(): In both cases it is necessary to give a complete > partition assignment and both can be long running due to data movement. > > > Outstanding questions: > > 1. Is the proposed alterInterReplicaThrottle() API really better than > changing the throttle via alterConfigs()? It wouldn't be necessary to > support alterConfigs() for all broker configs, just DynamicConfigs (which > can't be set via the config file any way). Would it be a problem that > triggering the reassignment required ClusterAction on the Cluster, but > throttling the assignment required Alter on the Topic? What if a user had > the former permission, but not the latter? > > 2. Is reassignPartitions() really the best name? I find the distinction > between reassigning and just assigning to be of limited value, and > "reassign" is misleading when the APIs now used for changing the > replication factor too. Since the API is asynchronous/long running, it > might be better to indicate that in the name some how. What about > startPartitionAssignment()? > > I am, of course, interested in any other questions or comments people have. > > > > > On 30 August 2017 at 16:17, Tom Bentley <t.j.bent...@gmail.com> wrote: > > > Hi all, > > > > I've updated the KIP as follows: > > > > * remove the APIs supporting progress reporting in favour of the APIs > > being implemented in KIP-113. > > * added some APIs to cover the existing functionality around throttling > > inter-broker transfers, which was previously a TODO. > > > > To respond to Colin's suggestion: > > > > > TopicAlterationResult alterTopics(TopicAlterationOptions options, > > > Map<String, TopicAlteration> alters) > > > > > > PartitionCountAlteration(int numPartitions) implements TopicAlteration > > > > > > ReplicationFactorAlteration(int repl) implements TopicAlteration > > > > > > ReassignPartitionsAlteration(...) implements TopicAlteration > > > > That is a workable alternative to providing 3 separate methods on the > > AdminClient, but I don't see why it is objectively better. I don't see > > clients commonly needing to do a mixture of alterations, and it assumes > > that the options make sense for all alterations. > > > > Cheers, > > > > Tom > > > > On 22 August 2017 at 23:49, Colin McCabe <cmcc...@apache.org> wrote: > > > >> On Wed, Aug 9, 2017, at 07:17, Tom Bentley wrote: > >> > Hi Dong and Jun, > >> > > >> > Thanks again for your input in this discussion and on KIP-113. It's > >> > difficult that discussion is split between this thread and the one for > >> > KIP-113, but I'll try to respond on this thread to questions asked on > >> > this > >> > thread. > >> > > >> > It seems there is some consensus that the alterTopic() API is the > wrong > >> > thing, and it would make more sense to separate the different kinds of > >> > alteration into separate APIs. It seems to me there is then a choice. > >> > > >> > 1. Have separate alterPartitionCount(), alterReplicationFactor() and > >> > reassignPartitions() methods. This would most closely match the > >> > facilities > >> > currently offered by kafka-alter-topics and kafka-reassign-partitions. > >> > 2. Just provide a reassignPartitions() method and infer from the shape > >> of > >> > the data passed to that that a change in replication factor and/or > >> > partition count is required, as suggested by Dong. > >> > > >> > The choice we make here is also relevant to KIP-113 and KIP-178. By > >> > choosing (1) we can change the replication factor or partition count > >> > without providing an assignment and therefore are necessarily > requiring > >> > the > >> > controller to make a decision for us about which broker (and, for 113, > >> > which log directory and thus which disk) the replica should be reside. > >> > There is then the matter of what criteria the controller should use to > >> > make > >> > that decision (a decision is also required on topic creation of > course, > >> > but > >> > I'll try not to go there right now). > >> > >> Hmm. One approach would be to have something like > >> > >> > TopicAlterationResult alterTopics(TopicAlterationOptions options, > >> > Map<String, TopicAlteration> alters) > >> > > >> > PartitionCountAlteration(int numPartitions) implements TopicAlteration > >> > > >> > ReplicationFactorAlteration(int repl) implements TopicAlteration > >> > > >> > ReassignPartitionsAlteration(...) implements TopicAlteration > >> > >> > >> > > >> > Choosing (2), on the other hand, forces us to make an assignment, and > >> > currently the AdminClient doesn't provide the APIs necessary to make a > >> > very > >> > informed decision. To do the job properly we'd need APIs to enumerate > >> the > >> > permissible log directories on each broker, know the current disk > usage > >> > etc. These just don't exist today, and I think it would be a lot of > work > >> > to > >> > bite off to specify them. We've already got three KIPs on the go. > >> > > >> > So, for the moment, I think we have to choose 1, for pragmatic > reasons. > >> > There is nothing to stop us deprecating alterPartitionCount() and > >> > alterReplicationFactor() and moving to (2) at a later date. > >> > >> Yeah, the API should be pragmatic. An API that doesn't let us do what > >> we want to do (like let Kafka assign us a new replica on the least > >> loaded node) is not good. > >> > >> > > >> > To answer a specific questions of Jun's: > >> > > >> > 3. It's not very clear to me what status_time in ReplicaStatusResponse > >> > > means. > >> > > >> > > >> > It corresponds to the ReplicaStatus.statusTime, and the idea was it > was > >> > the > >> > epoch offset at which the controllers notion of the lag was current > >> > (which > >> > I think is the epoch offset of the last FetchRequest from the > follower). > >> > At > >> > one point I was considering some other ways of determining progress > and > >> > completion, and TBH I think it was more pertinent to those rejected > >> > alternatives. > >> > > >> > I've already made some changes to KIP-179. As suggested in the thread > >> for > >> > KIP-113, I will be changing some more since I think we can > >> > DescribeDirsRequest > >> > instead of ReplicaStatusResponse to implement the --progress option. > >> > >> That makes sense. Whatever response is used, it would be nice to have > >> an error message in addition to an error code. > >> > >> best, > >> Colin > >> > >> > > >> > Cheers, > >> > > >> > Tom > >> > > >> > > >> > > >> > On 9 August 2017 at 02:28, Jun Rao <j...@confluent.io> wrote: > >> > > >> > > Hi, Tom, > >> > > > >> > > Thanks for the KIP. A few minor comments below. > >> > > > >> > > 1. Implementation wise, the broker handles adding partitions > >> differently > >> > > from changing replica assignment. For the former, we directly update > >> the > >> > > topic path in ZK with the new partitions. For the latter, we write > >> the new > >> > > partition reassignment in the partition reassignment path. Changing > >> the > >> > > replication factor is handled in the same way as changing replica > >> > > assignment. So, it would be useful to document how the broker > handles > >> these > >> > > different cases accordingly. I think it's simpler to just allow one > >> change > >> > > (partition, replication factor, or replica assignment) in a request. > >> > > > >> > > 2. Currently, we only allow adding partitions. We probably want to > >> document > >> > > the restriction in the api. > >> > > > >> > > 3. It's not very clear to me what status_time in > ReplicaStatusResponse > >> > > means. > >> > > > >> > > Jun > >> > > > >> > > > >> > > > >> > > On Fri, Aug 4, 2017 at 10:04 AM, Dong Lin <lindon...@gmail.com> > >> wrote: > >> > > > >> > > > Hey Tom, > >> > > > > >> > > > Thanks for your reply. Here are my thoughts: > >> > > > > >> > > > 1) I think the DescribeDirsResponse can be used by AdminClient to > >> query > >> > > the > >> > > > lag of follower replica as well. Here is how it works: > >> > > > > >> > > > - AdminClient sends DescribeDirsRequest to both the leader and the > >> > > follower > >> > > > of the partition. > >> > > > - DescribeDirsResponse from both leader and follower shows the LEO > >> of the > >> > > > leader replica and the follower replica. > >> > > > - Lag of the follower replica is the difference in the LEO between > >> leader > >> > > > and follower. > >> > > > > >> > > > In comparison to ReplicaStatusRequest, DescribeDirsRequest needs > to > >> be > >> > > send > >> > > > to each replica of the partition whereas ReplicaStatusRequest only > >> needs > >> > > to > >> > > > be sent to the leader of the partition. It doesn't seem to make > much > >> > > > difference though. In practice we probably want to query the > >> replica lag > >> > > of > >> > > > many partitions and AminClient needs to send exactly one request > to > >> each > >> > > > broker with either solution. Does this make sense? > >> > > > > >> > > > > >> > > > 2) KIP-179 proposes to add the following AdminClient API: > >> > > > > >> > > > alterTopics(Collection<AlteredTopic> alteredTopics, > >> AlterTopicsOptions > >> > > > options) > >> > > > > >> > > > Where AlteredTopic includes the following fields > >> > > > > >> > > > AlteredTopic(String name, int numPartitions, int > replicationFactor, > >> > > > Map<Integer,List<Integer>> replicasAssignment) > >> > > > > >> > > > I have two comments on this: > >> > > > > >> > > > - The information in AlteredTopic seems a bit redundant. Both > >> > > numPartitions > >> > > > and replicationFactor can be derived from replicasAssignment. I > >> think we > >> > > > can probably just use the following API in AdminClient instead: > >> > > > > >> > > > AlterTopicsResult alterTopics(Map<TopicPartition, List<Integer>> > >> > > > partitionAssignment, AlterTopicsOptions options) > >> > > > > >> > > > - Do you think "reassignPartitions" may be a better name than > >> > > > "alterTopics"? This is more consistent with the existing name used > >> in > >> > > > kafka-reassign-partitions.sh and I also find it to be more > >> accurate. On > >> > > the > >> > > > other hand, alterTopics seems to suggest that we can also alter > >> topic > >> > > > config. > >> > > > > >> > > > > >> > > > > >> > > > Thanks, > >> > > > Dong > >> > > > > >> > > > On Fri, Aug 4, 2017 at 1:03 AM, Tom Bentley < > t.j.bent...@gmail.com> > >> > > wrote: > >> > > > > >> > > > > Hi Dong, > >> > > > > > >> > > > > Thanks for your reply. > >> > > > > > >> > > > > You're right that your DescribeDirsResponse contains appropriate > >> data. > >> > > > The > >> > > > > comment about the log_end_offset in the KIP says "Enable user to > >> track > >> > > > > movement progress by comparing LEO of the *.log and *.move". > That > >> makes > >> > > > me > >> > > > > wonder whether this would only work for replica movement on the > >> same > >> > > > > broker. In the case of reassigning partitions between brokers > >> it's only > >> > > > > really the leader for the partition that knows the max LEO of > the > >> ISR > >> > > and > >> > > > > the LEO of the target broker. Maybe the comment is misleading > and > >> it > >> > > > would > >> > > > > be the leader doing the same thing in both cases. Can you > clarify > >> this? > >> > > > > >> > > > > >> > > > > At a conceptual level there's a lot of similarity between > KIP-113 > >> and > >> > > > > KIP-179 because they're both about moving replicas around. > >> Concretely > >> > > > > though, ChangeReplicaDirRequest assumes the movement is on the > >> same > >> > > > broker, > >> > > > > while the equivalent APIs in KIP-179 (whichever alternative) > lack > >> the > >> > > > > notion of disks. I wonder if a unified API would be better or > not. > >> > > > > Operationally, the reasons for changing replicas between brokers > >> are > >> > > > likely > >> > > > > to be motivated by balancing load across the cluster, or > >> > > adding/removing > >> > > > > brokers. But the JBOD use case has different motivations. So I > >> suspect > >> > > > it's > >> > > > > OK that the APIs are separate, but I'd love to hear what others > >> think. > >> > > > > >> > > > > >> > > > > I think you're right about TopicPartitionReplica. At the > protocol > >> level > >> > > > we > >> > > > > could group topics and partitions together so avoid having the > >> same > >> > > topic > >> > > > > name multiple times when querying for the status of all the > >> partitions > >> > > > of a > >> > > > > topic. > >> > > > > > >> > > > > Thanks again for taking the time to respond. > >> > > > > > >> > > > > Tom > >> > > > > > >> > > > > On 3 August 2017 at 23:07, Dong Lin <lindon...@gmail.com> > wrote: > >> > > > > > >> > > > > > Hey Tom, > >> > > > > > > >> > > > > > Thanks for the KIP. It seems that the prior discussion in this > >> thread > >> > > > has > >> > > > > > focused on reassigning partitions (or AlterTopics). I haven't > >> looked > >> > > > into > >> > > > > > this yet. I have two comments on the replicaStatus() API and > the > >> > > > > > ReplicaStatusRequest: > >> > > > > > > >> > > > > > - It seems that the use-case for ReplicaStatusRequest is > >> covered by > >> > > > > > the DescribeDirsRequest introduced in KIP-113 > >> > > > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >> > > > > > 113%3A+Support+replicas+movement+between+log+directories>. > >> > > > > > DescribeDirsResponse contains the logEndOffset for the > specified > >> > > > > > partitions. The the lag of the follower replica can be derived > >> as the > >> > > > > > difference between leader's LEO and follower's LEO. Do you > >> think we > >> > > can > >> > > > > > simply use DescribeDirsRequest without introducing > >> > > > ReplicaStatusRequest? > >> > > > > > > >> > > > > > - According to the current KIP, replicaStatus() takes a list > of > >> > > > > partitions > >> > > > > > as input. And its output maps the partition to a list of > replica > >> > > > status. > >> > > > > Do > >> > > > > > you think it would be better to have a new class called > >> > > > > > TopicPartitionReplica, which identifies the topic, partition > >> and the > >> > > > > > brokerId. replicaStatus can then take a list of > >> TopicPartitionReplica > >> > > > as > >> > > > > > input. And its output maps the replica to replica status. The > >> latter > >> > > > API > >> > > > > > seems simpler and also matches the method name better. What do > >> you > >> > > > think? > >> > > > > > > >> > > > > > Thanks, > >> > > > > > Dong > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > On Wed, Aug 2, 2017 at 5:12 AM, Tom Bentley < > >> t.j.bent...@gmail.com> > >> > > > > wrote: > >> > > > > > > >> > > > > > > Hi again Ismael, > >> > > > > > > > >> > > > > > > > >> > > > > > > 1. It's worth emphasising that reassigning partitions is a > >> > > different > >> > > > > > > >> process than what happens when a topic is created, so not > >> sure > >> > > > > trying > >> > > > > > to > >> > > > > > > >> make it symmetric is beneficial. In addition to what was > >> already > >> > > > > > > >> discussed, > >> > > > > > > >> one should also enable replication throttling before > >> moving the > >> > > > > data. > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > Thanks for your responses.The only advantage I can see > from > >> > > having > >> > > > > > > > separate APIs for partition count change vs. the other > >> changes is > >> > > > > that > >> > > > > > we > >> > > > > > > > expect the former to return synchronously, and the latter > to > >> > > > > (usually) > >> > > > > > > > return asynchronously. Lumping them into the same API > forces > >> > > > clients > >> > > > > of > >> > > > > > > > that API to code for the two possible cases. Is this the > >> > > particular > >> > > > > > > concern > >> > > > > > > > you had in mind, or do you have others? > >> > > > > > > > > >> > > > > > > > >> > > > > > > Just to help see what this looks like I've created another > >> version > >> > > of > >> > > > > > > KIP-179 ( > >> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Copy+ > >> > > > > > > of+KIP-179+-+Change+ReassignPartitionsCommand+to+use+ > >> AdminClient > >> > > > > > > ). > >> > > > > > > > >> > > > > > > The AdminClient APIs are more type safe (harder to misuse). > >> The > >> > > > > > > asynchronous nature of the alterReplicationFactors() and > >> > > > > > > reassignPartitions() methods aren't really apparent (you'd > >> still > >> > > have > >> > > > > to > >> > > > > > > read the javadocs to know you had to call replicaStatus() to > >> > > > determine > >> > > > > > > completion), but it's still better that the lumped-together > >> API > >> > > > because > >> > > > > > the > >> > > > > > > methods are _consistently_ async. The Protocol APIs are > >> slightly > >> > > > nicer > >> > > > > > too, > >> > > > > > > in that INVALID_REQUEST is less commonly returned. Another > >> benefit > >> > > is > >> > > > > it > >> > > > > > > would allow for these different alteration APIs to be > modified > >> > > > > > > independently in the future, should that necessary. > >> > > > > > > > >> > > > > > > What do others think? > >> > > > > > > > >> > > > > > > You're right that the proposal doesn't cover setting and > >> clearing a > >> > > > > > > > throttle, and it should. I will study the code about how > >> this > >> > > works > >> > > > > > > before > >> > > > > > > > posting back about that. > >> > > > > > > > > >> > > > > > > > >> > > > > > > AFAICS from the code the throttle is simply a dynamic config > >> of the > >> > > > > > broker. > >> > > > > > > We already have the AdminClient.alterConfigs API for setting > >> this, > >> > > so > >> > > > > the > >> > > > > > > refactoring of kafka-reassign-partitions.sh could just use > >> that > >> > > > > existing > >> > > > > > > API. We could still achieve your objective of setting the > >> throttle > >> > > > > before > >> > > > > > > reassignment by making the --throttle option mandatory when > >> > > --execute > >> > > > > was > >> > > > > > > present. Or did you have something else in mind? > >> > > > > > > > >> > > > > > > Thanks, > >> > > > > > > > >> > > > > > > Tom > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > > >