On Wed, Aug 9, 2017, at 07:17, Tom Bentley wrote: > Hi Dong and Jun, > > Thanks again for your input in this discussion and on KIP-113. It's > difficult that discussion is split between this thread and the one for > KIP-113, but I'll try to respond on this thread to questions asked on > this > thread. > > It seems there is some consensus that the alterTopic() API is the wrong > thing, and it would make more sense to separate the different kinds of > alteration into separate APIs. It seems to me there is then a choice. > > 1. Have separate alterPartitionCount(), alterReplicationFactor() and > reassignPartitions() methods. This would most closely match the > facilities > currently offered by kafka-alter-topics and kafka-reassign-partitions. > 2. Just provide a reassignPartitions() method and infer from the shape of > the data passed to that that a change in replication factor and/or > partition count is required, as suggested by Dong. > > The choice we make here is also relevant to KIP-113 and KIP-178. By > choosing (1) we can change the replication factor or partition count > without providing an assignment and therefore are necessarily requiring > the > controller to make a decision for us about which broker (and, for 113, > which log directory and thus which disk) the replica should be reside. > There is then the matter of what criteria the controller should use to > make > that decision (a decision is also required on topic creation of course, > but > I'll try not to go there right now).
Hmm. One approach would be to have something like > TopicAlterationResult alterTopics(TopicAlterationOptions options, > Map<String, TopicAlteration> alters) > > PartitionCountAlteration(int numPartitions) implements TopicAlteration > > ReplicationFactorAlteration(int repl) implements TopicAlteration > > ReassignPartitionsAlteration(...) implements TopicAlteration > > Choosing (2), on the other hand, forces us to make an assignment, and > currently the AdminClient doesn't provide the APIs necessary to make a > very > informed decision. To do the job properly we'd need APIs to enumerate the > permissible log directories on each broker, know the current disk usage > etc. These just don't exist today, and I think it would be a lot of work > to > bite off to specify them. We've already got three KIPs on the go. > > So, for the moment, I think we have to choose 1, for pragmatic reasons. > There is nothing to stop us deprecating alterPartitionCount() and > alterReplicationFactor() and moving to (2) at a later date. Yeah, the API should be pragmatic. An API that doesn't let us do what we want to do (like let Kafka assign us a new replica on the least loaded node) is not good. > > To answer a specific questions of Jun's: > > 3. It's not very clear to me what status_time in ReplicaStatusResponse > > means. > > > It corresponds to the ReplicaStatus.statusTime, and the idea was it was > the > epoch offset at which the controllers notion of the lag was current > (which > I think is the epoch offset of the last FetchRequest from the follower). > At > one point I was considering some other ways of determining progress and > completion, and TBH I think it was more pertinent to those rejected > alternatives. > > I've already made some changes to KIP-179. As suggested in the thread for > KIP-113, I will be changing some more since I think we can > DescribeDirsRequest > instead of ReplicaStatusResponse to implement the --progress option. That makes sense. Whatever response is used, it would be nice to have an error message in addition to an error code. best, Colin > > Cheers, > > Tom > > > > On 9 August 2017 at 02:28, Jun Rao <j...@confluent.io> wrote: > > > Hi, Tom, > > > > Thanks for the KIP. A few minor comments below. > > > > 1. Implementation wise, the broker handles adding partitions differently > > from changing replica assignment. For the former, we directly update the > > topic path in ZK with the new partitions. For the latter, we write the new > > partition reassignment in the partition reassignment path. Changing the > > replication factor is handled in the same way as changing replica > > assignment. So, it would be useful to document how the broker handles these > > different cases accordingly. I think it's simpler to just allow one change > > (partition, replication factor, or replica assignment) in a request. > > > > 2. Currently, we only allow adding partitions. We probably want to document > > the restriction in the api. > > > > 3. It's not very clear to me what status_time in ReplicaStatusResponse > > means. > > > > Jun > > > > > > > > On Fri, Aug 4, 2017 at 10:04 AM, Dong Lin <lindon...@gmail.com> wrote: > > > > > Hey Tom, > > > > > > Thanks for your reply. Here are my thoughts: > > > > > > 1) I think the DescribeDirsResponse can be used by AdminClient to query > > the > > > lag of follower replica as well. Here is how it works: > > > > > > - AdminClient sends DescribeDirsRequest to both the leader and the > > follower > > > of the partition. > > > - DescribeDirsResponse from both leader and follower shows the LEO of the > > > leader replica and the follower replica. > > > - Lag of the follower replica is the difference in the LEO between leader > > > and follower. > > > > > > In comparison to ReplicaStatusRequest, DescribeDirsRequest needs to be > > send > > > to each replica of the partition whereas ReplicaStatusRequest only needs > > to > > > be sent to the leader of the partition. It doesn't seem to make much > > > difference though. In practice we probably want to query the replica lag > > of > > > many partitions and AminClient needs to send exactly one request to each > > > broker with either solution. Does this make sense? > > > > > > > > > 2) KIP-179 proposes to add the following AdminClient API: > > > > > > alterTopics(Collection<AlteredTopic> alteredTopics, AlterTopicsOptions > > > options) > > > > > > Where AlteredTopic includes the following fields > > > > > > AlteredTopic(String name, int numPartitions, int replicationFactor, > > > Map<Integer,List<Integer>> replicasAssignment) > > > > > > I have two comments on this: > > > > > > - The information in AlteredTopic seems a bit redundant. Both > > numPartitions > > > and replicationFactor can be derived from replicasAssignment. I think we > > > can probably just use the following API in AdminClient instead: > > > > > > AlterTopicsResult alterTopics(Map<TopicPartition, List<Integer>> > > > partitionAssignment, AlterTopicsOptions options) > > > > > > - Do you think "reassignPartitions" may be a better name than > > > "alterTopics"? This is more consistent with the existing name used in > > > kafka-reassign-partitions.sh and I also find it to be more accurate. On > > the > > > other hand, alterTopics seems to suggest that we can also alter topic > > > config. > > > > > > > > > > > > Thanks, > > > Dong > > > > > > On Fri, Aug 4, 2017 at 1:03 AM, Tom Bentley <t.j.bent...@gmail.com> > > wrote: > > > > > > > Hi Dong, > > > > > > > > Thanks for your reply. > > > > > > > > You're right that your DescribeDirsResponse contains appropriate data. > > > The > > > > comment about the log_end_offset in the KIP says "Enable user to track > > > > movement progress by comparing LEO of the *.log and *.move". That makes > > > me > > > > wonder whether this would only work for replica movement on the same > > > > broker. In the case of reassigning partitions between brokers it's only > > > > really the leader for the partition that knows the max LEO of the ISR > > and > > > > the LEO of the target broker. Maybe the comment is misleading and it > > > would > > > > be the leader doing the same thing in both cases. Can you clarify this? > > > > > > > > > > At a conceptual level there's a lot of similarity between KIP-113 and > > > > KIP-179 because they're both about moving replicas around. Concretely > > > > though, ChangeReplicaDirRequest assumes the movement is on the same > > > broker, > > > > while the equivalent APIs in KIP-179 (whichever alternative) lack the > > > > notion of disks. I wonder if a unified API would be better or not. > > > > Operationally, the reasons for changing replicas between brokers are > > > likely > > > > to be motivated by balancing load across the cluster, or > > adding/removing > > > > brokers. But the JBOD use case has different motivations. So I suspect > > > it's > > > > OK that the APIs are separate, but I'd love to hear what others think. > > > > > > > > > > I think you're right about TopicPartitionReplica. At the protocol level > > > we > > > > could group topics and partitions together so avoid having the same > > topic > > > > name multiple times when querying for the status of all the partitions > > > of a > > > > topic. > > > > > > > > Thanks again for taking the time to respond. > > > > > > > > Tom > > > > > > > > On 3 August 2017 at 23:07, Dong Lin <lindon...@gmail.com> wrote: > > > > > > > > > Hey Tom, > > > > > > > > > > Thanks for the KIP. It seems that the prior discussion in this thread > > > has > > > > > focused on reassigning partitions (or AlterTopics). I haven't looked > > > into > > > > > this yet. I have two comments on the replicaStatus() API and the > > > > > ReplicaStatusRequest: > > > > > > > > > > - It seems that the use-case for ReplicaStatusRequest is covered by > > > > > the DescribeDirsRequest introduced in KIP-113 > > > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > > > 113%3A+Support+replicas+movement+between+log+directories>. > > > > > DescribeDirsResponse contains the logEndOffset for the specified > > > > > partitions. The the lag of the follower replica can be derived as the > > > > > difference between leader's LEO and follower's LEO. Do you think we > > can > > > > > simply use DescribeDirsRequest without introducing > > > ReplicaStatusRequest? > > > > > > > > > > - According to the current KIP, replicaStatus() takes a list of > > > > partitions > > > > > as input. And its output maps the partition to a list of replica > > > status. > > > > Do > > > > > you think it would be better to have a new class called > > > > > TopicPartitionReplica, which identifies the topic, partition and the > > > > > brokerId. replicaStatus can then take a list of TopicPartitionReplica > > > as > > > > > input. And its output maps the replica to replica status. The latter > > > API > > > > > seems simpler and also matches the method name better. What do you > > > think? > > > > > > > > > > Thanks, > > > > > Dong > > > > > > > > > > > > > > > > > > > > On Wed, Aug 2, 2017 at 5:12 AM, Tom Bentley <t.j.bent...@gmail.com> > > > > wrote: > > > > > > > > > > > Hi again Ismael, > > > > > > > > > > > > > > > > > > 1. It's worth emphasising that reassigning partitions is a > > different > > > > > > >> process than what happens when a topic is created, so not sure > > > > trying > > > > > to > > > > > > >> make it symmetric is beneficial. In addition to what was already > > > > > > >> discussed, > > > > > > >> one should also enable replication throttling before moving the > > > > data. > > > > > > > > > > > > > > > > > > > > > Thanks for your responses.The only advantage I can see from > > having > > > > > > > separate APIs for partition count change vs. the other changes is > > > > that > > > > > we > > > > > > > expect the former to return synchronously, and the latter to > > > > (usually) > > > > > > > return asynchronously. Lumping them into the same API forces > > > clients > > > > of > > > > > > > that API to code for the two possible cases. Is this the > > particular > > > > > > concern > > > > > > > you had in mind, or do you have others? > > > > > > > > > > > > > > > > > > > Just to help see what this looks like I've created another version > > of > > > > > > KIP-179 ( > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Copy+ > > > > > > of+KIP-179+-+Change+ReassignPartitionsCommand+to+use+AdminClient > > > > > > ). > > > > > > > > > > > > The AdminClient APIs are more type safe (harder to misuse). The > > > > > > asynchronous nature of the alterReplicationFactors() and > > > > > > reassignPartitions() methods aren't really apparent (you'd still > > have > > > > to > > > > > > read the javadocs to know you had to call replicaStatus() to > > > determine > > > > > > completion), but it's still better that the lumped-together API > > > because > > > > > the > > > > > > methods are _consistently_ async. The Protocol APIs are slightly > > > nicer > > > > > too, > > > > > > in that INVALID_REQUEST is less commonly returned. Another benefit > > is > > > > it > > > > > > would allow for these different alteration APIs to be modified > > > > > > independently in the future, should that necessary. > > > > > > > > > > > > What do others think? > > > > > > > > > > > > You're right that the proposal doesn't cover setting and clearing a > > > > > > > throttle, and it should. I will study the code about how this > > works > > > > > > before > > > > > > > posting back about that. > > > > > > > > > > > > > > > > > > > AFAICS from the code the throttle is simply a dynamic config of the > > > > > broker. > > > > > > We already have the AdminClient.alterConfigs API for setting this, > > so > > > > the > > > > > > refactoring of kafka-reassign-partitions.sh could just use that > > > > existing > > > > > > API. We could still achieve your objective of setting the throttle > > > > before > > > > > > reassignment by making the --throttle option mandatory when > > --execute > > > > was > > > > > > present. Or did you have something else in mind? > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Tom > > > > > > > > > > > > > > > > > > > >