On Tue, Sep 5, 2017, at 09:39, Tom Bentley wrote:
> I've revised this KIP again:
> 
> * Change the alterPartitionCounts() API to support passing an optional
> assignment for the new partitions (which is already supported by
> kafka-topics.sh). At the same time I didn't want the API to suggest it
> was
> possible to change the existing assignments in the same call (which isn't
> supported today).
> 
> * Removed alterReplicationFactor(), since it's really just a special case
> of reassignPartitions(): In both cases it is necessary to give a complete
> partition assignment and both can be long running due to data movement.

Thanks, Tom.

Do we expect that reducing the number of partitions will ever be
supported by this API?  It seems like decreasing would require a
different API-- one which supported data movement, had a "check status
of this operation" feature, etc. etc.  If this API is only ever going to
be used to increase the number of partitions, I think we should just
call it "increasePartitionCount" to avoid confusion.

>
> Outstanding questions:
> 
> 1. Is the proposed alterInterReplicaThrottle() API really better than
> changing the throttle via alterConfigs()?

That's a good point.  I would argue that we should just use alterConfigs
to set the broker configuration, rather than having a special RPC just
for this.  

> It wouldn't be necessary to
> support alterConfigs() for all broker configs, just DynamicConfigs (which
> can't be set via the config file any way).

Hmm.  For this purpose, it's not even necessary to support alterConfigs
for all dynamic configurations.  You could just support alterConfigs on
this particular dynamic configuration key.  It might be better to be
conservative here, since exposing all of our internals could lead to
compatibility problems later on.

> Would it be a problem that
> triggering the reassignment required ClusterAction on the Cluster, but
> throttling the assignment required Alter on the Topic? What if a user had
> the former permission, but not the latter?

We've been trying to reserve ClusterAction on Cluster for
broker-initiated operations.  Alter on Cluster is the ACL for "root
stuff" and I would argue that it should be what we use here.

For reconfiguring the broker, I think we should follow KIP-133 and use
AlterConfigs on the Broker resource.  (Of course, if you just use the
existing alterConfigs call, you get this without any special effort.)

> 
> 2. Is reassignPartitions() really the best name? I find the distinction
> between reassigning and just assigning to be of limited value, and
> "reassign" is misleading when the APIs now used for changing the
> replication factor too. Since the API is asynchronous/long running, it
> might be better to indicate that in the name some how. What about
> startPartitionAssignment()?

Good idea -- I like the idea of using "start" or "initiate" to indicate
that this is kicking off a long-running operation.

"reassign" seemed like a natural choice to me since this is changing an
existing assignment.  It was assigned (when the topic it was created)--
now it's being re-assigned.  If you change it to just "assign" then it
feels confusing to me.  A new user might ask if "assigning partitions"
is a step that they should take after creating a topic, similar to how
subscribing to topics is a step you take after creating a consumer.

Basically, "reassign" makes it clear that it is changing an assignment
that already exists.  "assign" leaves open the possibility that no
assignment exists (which of course we know is not true, but it would be
nice if the name reflected that.)

> On 30 August 2017 at 16:17, Tom Bentley <t.j.bent...@gmail.com> wrote:
> 
> > Hi all,
> >
> > I've updated the KIP as follows:
> >
> > * remove the APIs supporting progress reporting in favour of the APIs
> > being implemented in KIP-113.
> > * added some APIs to cover the existing functionality around throttling
> > inter-broker transfers, which was previously a TODO.
> >
> > To respond to Colin's suggestion:
> >
> > > TopicAlterationResult alterTopics(TopicAlterationOptions options,
> > >                                   Map<String, TopicAlteration> alters)
> > >
> > > PartitionCountAlteration(int numPartitions) implements TopicAlteration
> > >
> > > ReplicationFactorAlteration(int repl) implements TopicAlteration
> > >
> > > ReassignPartitionsAlteration(...) implements TopicAlteration
> >
> > That is a workable alternative to providing 3 separate methods on the
> > AdminClient, but I don't see why it is objectively better. I don't see
> > clients commonly needing to do a mixture of alterations, and it assumes
> > that the options make sense for all alterations.

Yeah-- it was just an idea I had for an alternate API that grouped
things a little bit more.  I agree that it does constrain us to using
only options that make sense for all the operations, which is not ideal.
 So your existing split into separate functions probably makes more
sense.

cheers,
Colin

> >
> > Cheers,
> >
> > Tom
> >
> > On 22 August 2017 at 23:49, Colin McCabe <cmcc...@apache.org> wrote:
> >
> >> On Wed, Aug 9, 2017, at 07:17, Tom Bentley wrote:
> >> > Hi Dong and Jun,
> >> >
> >> > Thanks again for your input in this discussion and on KIP-113. It's
> >> > difficult that discussion is split between this thread and the one for
> >> > KIP-113, but I'll try to respond on this thread to questions asked on
> >> > this
> >> > thread.
> >> >
> >> > It seems there is some consensus that the alterTopic() API is the wrong
> >> > thing, and it would make more sense to separate the different kinds of
> >> > alteration into separate APIs. It seems to me there is then a choice.
> >> >
> >> > 1. Have separate alterPartitionCount(), alterReplicationFactor() and
> >> > reassignPartitions() methods. This would most closely match the
> >> > facilities
> >> > currently offered by kafka-alter-topics and kafka-reassign-partitions.
> >> > 2. Just provide a reassignPartitions() method and infer from the shape
> >> of
> >> > the data passed to that that a change in replication factor and/or
> >> > partition count is required, as suggested by Dong.
> >> >
> >> > The choice we make here is also relevant to KIP-113 and KIP-178. By
> >> > choosing (1) we can change the replication factor or partition count
> >> > without providing an assignment and therefore are necessarily requiring
> >> > the
> >> > controller to make a decision for us about which broker (and, for 113,
> >> > which log directory and thus which disk) the replica should be reside.
> >> > There is then the matter of what criteria the controller should use to
> >> > make
> >> > that decision (a decision is also required on topic creation of course,
> >> > but
> >> > I'll try not to go there right now).
> >>
> >> Hmm.  One approach would be to have something like
> >>
> >> > TopicAlterationResult alterTopics(TopicAlterationOptions options,
> >> >                                   Map<String, TopicAlteration> alters)
> >> >
> >> > PartitionCountAlteration(int numPartitions) implements TopicAlteration
> >> >
> >> > ReplicationFactorAlteration(int repl) implements TopicAlteration
> >> >
> >> > ReassignPartitionsAlteration(...) implements TopicAlteration
> >>
> >>
> >> >
> >> > Choosing (2), on the other hand, forces us to make an assignment, and
> >> > currently the AdminClient doesn't provide the APIs necessary to make a
> >> > very
> >> > informed decision. To do the job properly we'd need APIs to enumerate
> >> the
> >> > permissible log directories on each broker, know the current disk usage
> >> > etc. These just don't exist today, and I think it would be a lot of work
> >> > to
> >> > bite off to specify them. We've already got three KIPs on the go.
> >> >
> >> > So, for the moment, I think we have to choose 1, for pragmatic reasons.
> >> > There is nothing to stop us deprecating alterPartitionCount() and
> >> > alterReplicationFactor() and moving to (2) at a later date.
> >>
> >> Yeah, the API should be pragmatic.  An API that doesn't let us do what
> >> we want to do (like let Kafka assign us a new replica on the least
> >> loaded node) is not good.
> >>
> >> >
> >> > To answer a specific questions of Jun's:
> >> >
> >> > 3. It's not very clear to me what status_time in ReplicaStatusResponse
> >> > > means.
> >> >
> >> >
> >> > It corresponds to the ReplicaStatus.statusTime, and the idea was it was
> >> > the
> >> > epoch offset at which the controllers notion of the lag was current
> >> > (which
> >> > I think is the epoch offset of the last FetchRequest from the follower).
> >> > At
> >> > one point I was considering some other ways of determining progress and
> >> > completion, and TBH I think it was more pertinent to those rejected
> >> > alternatives.
> >> >
> >> > I've already made some changes to KIP-179. As suggested in the thread
> >> for
> >> > KIP-113, I will be changing some more since I think we can
> >> > DescribeDirsRequest
> >> > instead of ReplicaStatusResponse to implement the --progress option.
> >>
> >> That makes sense.  Whatever response is used, it would be nice to have
> >> an error message in addition to an error code.
> >>
> >> best,
> >> Colin
> >>
> >> >
> >> > Cheers,
> >> >
> >> > Tom
> >> >
> >> >
> >> >
> >> > On 9 August 2017 at 02:28, Jun Rao <j...@confluent.io> wrote:
> >> >
> >> > > Hi, Tom,
> >> > >
> >> > > Thanks for the KIP. A few minor comments below.
> >> > >
> >> > > 1. Implementation wise, the broker handles adding partitions
> >> differently
> >> > > from changing replica assignment. For the former, we directly update
> >> the
> >> > > topic path in ZK with the new partitions. For the latter, we write
> >> the new
> >> > > partition reassignment in the partition reassignment path. Changing
> >> the
> >> > > replication factor is handled in the same way as changing replica
> >> > > assignment. So, it would be useful to document how the broker handles
> >> these
> >> > > different cases accordingly. I think it's simpler to just allow one
> >> change
> >> > > (partition, replication factor, or replica assignment) in a request.
> >> > >
> >> > > 2. Currently, we only allow adding partitions. We probably want to
> >> document
> >> > > the restriction in the api.
> >> > >
> >> > > 3. It's not very clear to me what status_time in ReplicaStatusResponse
> >> > > means.
> >> > >
> >> > > Jun
> >> > >
> >> > >
> >> > >
> >> > > On Fri, Aug 4, 2017 at 10:04 AM, Dong Lin <lindon...@gmail.com>
> >> wrote:
> >> > >
> >> > > > Hey Tom,
> >> > > >
> >> > > > Thanks for your reply. Here are my thoughts:
> >> > > >
> >> > > > 1) I think the DescribeDirsResponse can be used by AdminClient to
> >> query
> >> > > the
> >> > > > lag of follower replica as well. Here is how it works:
> >> > > >
> >> > > > - AdminClient sends DescribeDirsRequest to both the leader and the
> >> > > follower
> >> > > > of the partition.
> >> > > > - DescribeDirsResponse from both leader and follower shows the LEO
> >> of the
> >> > > > leader replica and the follower replica.
> >> > > > - Lag of the follower replica is the difference in the LEO between
> >> leader
> >> > > > and follower.
> >> > > >
> >> > > > In comparison to ReplicaStatusRequest, DescribeDirsRequest needs to
> >> be
> >> > > send
> >> > > > to each replica of the partition whereas ReplicaStatusRequest only
> >> needs
> >> > > to
> >> > > > be sent to the leader of the partition. It doesn't seem to make much
> >> > > > difference though. In practice we probably want to query the
> >> replica lag
> >> > > of
> >> > > > many partitions and AminClient needs to send exactly one request to
> >> each
> >> > > > broker with either solution. Does this make sense?
> >> > > >
> >> > > >
> >> > > > 2) KIP-179 proposes to add the following AdminClient API:
> >> > > >
> >> > > > alterTopics(Collection<AlteredTopic> alteredTopics,
> >> AlterTopicsOptions
> >> > > > options)
> >> > > >
> >> > > > Where AlteredTopic includes the following fields
> >> > > >
> >> > > > AlteredTopic(String name, int numPartitions, int replicationFactor,
> >> > > > Map<Integer,List<Integer>> replicasAssignment)
> >> > > >
> >> > > > I have two comments on this:
> >> > > >
> >> > > > - The information in AlteredTopic seems a bit redundant. Both
> >> > > numPartitions
> >> > > > and replicationFactor can be derived from replicasAssignment. I
> >> think we
> >> > > > can probably just use the following API in AdminClient instead:
> >> > > >
> >> > > > AlterTopicsResult alterTopics(Map<TopicPartition, List<Integer>>
> >> > > > partitionAssignment, AlterTopicsOptions options)
> >> > > >
> >> > > > - Do you think "reassignPartitions" may be a better name than
> >> > > > "alterTopics"? This is more consistent with the existing name used
> >> in
> >> > > > kafka-reassign-partitions.sh and I also find it to be more
> >> accurate. On
> >> > > the
> >> > > > other hand, alterTopics seems to suggest that we can also alter
> >> topic
> >> > > > config.
> >> > > >
> >> > > >
> >> > > >
> >> > > > Thanks,
> >> > > > Dong
> >> > > >
> >> > > > On Fri, Aug 4, 2017 at 1:03 AM, Tom Bentley <t.j.bent...@gmail.com>
> >> > > wrote:
> >> > > >
> >> > > > > Hi Dong,
> >> > > > >
> >> > > > > Thanks for your reply.
> >> > > > >
> >> > > > > You're right that your DescribeDirsResponse contains appropriate
> >> data.
> >> > > > The
> >> > > > > comment about the log_end_offset in the KIP says "Enable user to
> >> track
> >> > > > > movement progress by comparing LEO of the *.log and *.move". That
> >> makes
> >> > > > me
> >> > > > > wonder whether this would only work for replica movement on the
> >> same
> >> > > > > broker. In the case of reassigning partitions between brokers
> >> it's only
> >> > > > > really the leader for the partition that knows the max LEO of the
> >> ISR
> >> > > and
> >> > > > > the LEO of the target broker. Maybe the comment is misleading and
> >> it
> >> > > > would
> >> > > > > be the leader doing the same thing in both cases. Can you clarify
> >> this?
> >> > > >
> >> > > >
> >> > > > > At a conceptual level there's a lot of similarity between KIP-113
> >> and
> >> > > > > KIP-179 because they're both about moving replicas around.
> >> Concretely
> >> > > > > though, ChangeReplicaDirRequest assumes the movement is on the
> >> same
> >> > > > broker,
> >> > > > > while the equivalent APIs in KIP-179 (whichever alternative) lack
> >> the
> >> > > > > notion of disks. I wonder if a unified API would be better or not.
> >> > > > > Operationally, the reasons for changing replicas between brokers
> >> are
> >> > > > likely
> >> > > > > to be motivated by balancing load across the cluster, or
> >> > > adding/removing
> >> > > > > brokers. But the JBOD use case has different motivations. So I
> >> suspect
> >> > > > it's
> >> > > > > OK that the APIs are separate, but I'd love to hear what others
> >> think.
> >> > > >
> >> > > >
> >> > > > > I think you're right about TopicPartitionReplica. At the protocol
> >> level
> >> > > > we
> >> > > > > could group topics and partitions together so avoid having the
> >> same
> >> > > topic
> >> > > > > name multiple times when querying for the status of all the
> >> partitions
> >> > > > of a
> >> > > > > topic.
> >> > > > >
> >> > > > > Thanks again for taking the time to respond.
> >> > > > >
> >> > > > > Tom
> >> > > > >
> >> > > > > On 3 August 2017 at 23:07, Dong Lin <lindon...@gmail.com> wrote:
> >> > > > >
> >> > > > > > Hey Tom,
> >> > > > > >
> >> > > > > > Thanks for the KIP. It seems that the prior discussion in this
> >> thread
> >> > > > has
> >> > > > > > focused on reassigning partitions (or AlterTopics). I haven't
> >> looked
> >> > > > into
> >> > > > > > this yet. I have two comments on the replicaStatus() API and the
> >> > > > > > ReplicaStatusRequest:
> >> > > > > >
> >> > > > > > -  It seems that the use-case for ReplicaStatusRequest is
> >> covered by
> >> > > > > > the DescribeDirsRequest introduced in KIP-113
> >> > > > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> > > > > > 113%3A+Support+replicas+movement+between+log+directories>.
> >> > > > > > DescribeDirsResponse contains the logEndOffset for the specified
> >> > > > > > partitions. The the lag of the follower replica can be derived
> >> as the
> >> > > > > > difference between leader's LEO and follower's LEO. Do you
> >> think we
> >> > > can
> >> > > > > > simply use DescribeDirsRequest without introducing
> >> > > > ReplicaStatusRequest?
> >> > > > > >
> >> > > > > > - According to the current KIP, replicaStatus() takes a list of
> >> > > > > partitions
> >> > > > > > as input. And its output maps the partition to a list of replica
> >> > > > status.
> >> > > > > Do
> >> > > > > > you think it would be better to have a new class called
> >> > > > > > TopicPartitionReplica, which identifies the topic, partition
> >> and the
> >> > > > > > brokerId. replicaStatus can then take a list of
> >> TopicPartitionReplica
> >> > > > as
> >> > > > > > input. And its output maps the replica to replica status. The
> >> latter
> >> > > > API
> >> > > > > > seems simpler and also matches the method name better. What do
> >> you
> >> > > > think?
> >> > > > > >
> >> > > > > > Thanks,
> >> > > > > > Dong
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > > On Wed, Aug 2, 2017 at 5:12 AM, Tom Bentley <
> >> t.j.bent...@gmail.com>
> >> > > > > wrote:
> >> > > > > >
> >> > > > > > > Hi again Ismael,
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > 1. It's worth emphasising that reassigning partitions is a
> >> > > different
> >> > > > > > > >> process than what happens when a topic is created, so not
> >> sure
> >> > > > > trying
> >> > > > > > to
> >> > > > > > > >> make it symmetric is beneficial. In addition to what was
> >> already
> >> > > > > > > >> discussed,
> >> > > > > > > >> one should also enable replication throttling before
> >> moving the
> >> > > > > data.
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > > Thanks for your responses.The only advantage I can see from
> >> > > having
> >> > > > > > > > separate APIs for partition count change vs. the other
> >> changes is
> >> > > > > that
> >> > > > > > we
> >> > > > > > > > expect the former to return synchronously, and the latter to
> >> > > > > (usually)
> >> > > > > > > > return asynchronously. Lumping them into the same API forces
> >> > > > clients
> >> > > > > of
> >> > > > > > > > that API to code for the two possible cases. Is this the
> >> > > particular
> >> > > > > > > concern
> >> > > > > > > > you had in mind, or do you have others?
> >> > > > > > > >
> >> > > > > > >
> >> > > > > > > Just to help see what this looks like I've created another
> >> version
> >> > > of
> >> > > > > > > KIP-179 (
> >> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Copy+
> >> > > > > > > of+KIP-179+-+Change+ReassignPartitionsCommand+to+use+
> >> AdminClient
> >> > > > > > > ).
> >> > > > > > >
> >> > > > > > > The AdminClient APIs are more type safe (harder to misuse).
> >> The
> >> > > > > > > asynchronous nature of the alterReplicationFactors() and
> >> > > > > > > reassignPartitions() methods aren't really apparent (you'd
> >> still
> >> > > have
> >> > > > > to
> >> > > > > > > read the javadocs to know you had to call replicaStatus() to
> >> > > > determine
> >> > > > > > > completion), but it's still better that the lumped-together
> >> API
> >> > > > because
> >> > > > > > the
> >> > > > > > > methods are _consistently_ async. The Protocol APIs are
> >> slightly
> >> > > > nicer
> >> > > > > > too,
> >> > > > > > > in that INVALID_REQUEST is less commonly returned. Another
> >> benefit
> >> > > is
> >> > > > > it
> >> > > > > > > would allow for these different alteration APIs to be modified
> >> > > > > > > independently in the future, should that necessary.
> >> > > > > > >
> >> > > > > > > What do others think?
> >> > > > > > >
> >> > > > > > > You're right that the proposal doesn't cover setting and
> >> clearing a
> >> > > > > > > > throttle, and it should. I will study the code about how
> >> this
> >> > > works
> >> > > > > > > before
> >> > > > > > > > posting back about that.
> >> > > > > > > >
> >> > > > > > >
> >> > > > > > > AFAICS from the code the throttle is simply a dynamic config
> >> of the
> >> > > > > > broker.
> >> > > > > > > We already have the AdminClient.alterConfigs API for setting
> >> this,
> >> > > so
> >> > > > > the
> >> > > > > > > refactoring of kafka-reassign-partitions.sh could just use
> >> that
> >> > > > > existing
> >> > > > > > > API. We could still achieve your objective of setting the
> >> throttle
> >> > > > > before
> >> > > > > > > reassignment by making the --throttle option mandatory when
> >> > > --execute
> >> > > > > was
> >> > > > > > > present. Or did you have something else in mind?
> >> > > > > > >
> >> > > > > > > Thanks,
> >> > > > > > >
> >> > > > > > > Tom
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >>
> >
> >

Reply via email to