bq. What about startPartitionAssignment() ?

Make sense.
startPartitionReassignment() seems to be better since the API deals with
reassignment.

Cheers

On Tue, Sep 5, 2017 at 9:39 AM, Tom Bentley <t.j.bent...@gmail.com> wrote:

> I've revised this KIP again:
>
> * Change the alterPartitionCounts() API to support passing an optional
> assignment for the new partitions (which is already supported by
> kafka-topics.sh). At the same time I didn't want the API to suggest it was
> possible to change the existing assignments in the same call (which isn't
> supported today).
>
> * Removed alterReplicationFactor(), since it's really just a special case
> of reassignPartitions(): In both cases it is necessary to give a complete
> partition assignment and both can be long running due to data movement.
>
>
> Outstanding questions:
>
> 1. Is the proposed alterInterReplicaThrottle() API really better than
> changing the throttle via alterConfigs()? It wouldn't be necessary to
> support alterConfigs() for all broker configs, just DynamicConfigs (which
> can't be set via the config file any way). Would it be a problem that
> triggering the reassignment required ClusterAction on the Cluster, but
> throttling the assignment required Alter on the Topic? What if a user had
> the former permission, but not the latter?
>
> 2. Is reassignPartitions() really the best name? I find the distinction
> between reassigning and just assigning to be of limited value, and
> "reassign" is misleading when the APIs now used for changing the
> replication factor too. Since the API is asynchronous/long running, it
> might be better to indicate that in the name some how. What about
> startPartitionAssignment()?
>
> I am, of course, interested in any other questions or comments people have.
>
>
>
>
> On 30 August 2017 at 16:17, Tom Bentley <t.j.bent...@gmail.com> wrote:
>
> > Hi all,
> >
> > I've updated the KIP as follows:
> >
> > * remove the APIs supporting progress reporting in favour of the APIs
> > being implemented in KIP-113.
> > * added some APIs to cover the existing functionality around throttling
> > inter-broker transfers, which was previously a TODO.
> >
> > To respond to Colin's suggestion:
> >
> > > TopicAlterationResult alterTopics(TopicAlterationOptions options,
> > >                                   Map<String, TopicAlteration> alters)
> > >
> > > PartitionCountAlteration(int numPartitions) implements TopicAlteration
> > >
> > > ReplicationFactorAlteration(int repl) implements TopicAlteration
> > >
> > > ReassignPartitionsAlteration(...) implements TopicAlteration
> >
> > That is a workable alternative to providing 3 separate methods on the
> > AdminClient, but I don't see why it is objectively better. I don't see
> > clients commonly needing to do a mixture of alterations, and it assumes
> > that the options make sense for all alterations.
> >
> > Cheers,
> >
> > Tom
> >
> > On 22 August 2017 at 23:49, Colin McCabe <cmcc...@apache.org> wrote:
> >
> >> On Wed, Aug 9, 2017, at 07:17, Tom Bentley wrote:
> >> > Hi Dong and Jun,
> >> >
> >> > Thanks again for your input in this discussion and on KIP-113. It's
> >> > difficult that discussion is split between this thread and the one for
> >> > KIP-113, but I'll try to respond on this thread to questions asked on
> >> > this
> >> > thread.
> >> >
> >> > It seems there is some consensus that the alterTopic() API is the
> wrong
> >> > thing, and it would make more sense to separate the different kinds of
> >> > alteration into separate APIs. It seems to me there is then a choice.
> >> >
> >> > 1. Have separate alterPartitionCount(), alterReplicationFactor() and
> >> > reassignPartitions() methods. This would most closely match the
> >> > facilities
> >> > currently offered by kafka-alter-topics and kafka-reassign-partitions.
> >> > 2. Just provide a reassignPartitions() method and infer from the shape
> >> of
> >> > the data passed to that that a change in replication factor and/or
> >> > partition count is required, as suggested by Dong.
> >> >
> >> > The choice we make here is also relevant to KIP-113 and KIP-178. By
> >> > choosing (1) we can change the replication factor or partition count
> >> > without providing an assignment and therefore are necessarily
> requiring
> >> > the
> >> > controller to make a decision for us about which broker (and, for 113,
> >> > which log directory and thus which disk) the replica should be reside.
> >> > There is then the matter of what criteria the controller should use to
> >> > make
> >> > that decision (a decision is also required on topic creation of
> course,
> >> > but
> >> > I'll try not to go there right now).
> >>
> >> Hmm.  One approach would be to have something like
> >>
> >> > TopicAlterationResult alterTopics(TopicAlterationOptions options,
> >> >                                   Map<String, TopicAlteration> alters)
> >> >
> >> > PartitionCountAlteration(int numPartitions) implements TopicAlteration
> >> >
> >> > ReplicationFactorAlteration(int repl) implements TopicAlteration
> >> >
> >> > ReassignPartitionsAlteration(...) implements TopicAlteration
> >>
> >>
> >> >
> >> > Choosing (2), on the other hand, forces us to make an assignment, and
> >> > currently the AdminClient doesn't provide the APIs necessary to make a
> >> > very
> >> > informed decision. To do the job properly we'd need APIs to enumerate
> >> the
> >> > permissible log directories on each broker, know the current disk
> usage
> >> > etc. These just don't exist today, and I think it would be a lot of
> work
> >> > to
> >> > bite off to specify them. We've already got three KIPs on the go.
> >> >
> >> > So, for the moment, I think we have to choose 1, for pragmatic
> reasons.
> >> > There is nothing to stop us deprecating alterPartitionCount() and
> >> > alterReplicationFactor() and moving to (2) at a later date.
> >>
> >> Yeah, the API should be pragmatic.  An API that doesn't let us do what
> >> we want to do (like let Kafka assign us a new replica on the least
> >> loaded node) is not good.
> >>
> >> >
> >> > To answer a specific questions of Jun's:
> >> >
> >> > 3. It's not very clear to me what status_time in ReplicaStatusResponse
> >> > > means.
> >> >
> >> >
> >> > It corresponds to the ReplicaStatus.statusTime, and the idea was it
> was
> >> > the
> >> > epoch offset at which the controllers notion of the lag was current
> >> > (which
> >> > I think is the epoch offset of the last FetchRequest from the
> follower).
> >> > At
> >> > one point I was considering some other ways of determining progress
> and
> >> > completion, and TBH I think it was more pertinent to those rejected
> >> > alternatives.
> >> >
> >> > I've already made some changes to KIP-179. As suggested in the thread
> >> for
> >> > KIP-113, I will be changing some more since I think we can
> >> > DescribeDirsRequest
> >> > instead of ReplicaStatusResponse to implement the --progress option.
> >>
> >> That makes sense.  Whatever response is used, it would be nice to have
> >> an error message in addition to an error code.
> >>
> >> best,
> >> Colin
> >>
> >> >
> >> > Cheers,
> >> >
> >> > Tom
> >> >
> >> >
> >> >
> >> > On 9 August 2017 at 02:28, Jun Rao <j...@confluent.io> wrote:
> >> >
> >> > > Hi, Tom,
> >> > >
> >> > > Thanks for the KIP. A few minor comments below.
> >> > >
> >> > > 1. Implementation wise, the broker handles adding partitions
> >> differently
> >> > > from changing replica assignment. For the former, we directly update
> >> the
> >> > > topic path in ZK with the new partitions. For the latter, we write
> >> the new
> >> > > partition reassignment in the partition reassignment path. Changing
> >> the
> >> > > replication factor is handled in the same way as changing replica
> >> > > assignment. So, it would be useful to document how the broker
> handles
> >> these
> >> > > different cases accordingly. I think it's simpler to just allow one
> >> change
> >> > > (partition, replication factor, or replica assignment) in a request.
> >> > >
> >> > > 2. Currently, we only allow adding partitions. We probably want to
> >> document
> >> > > the restriction in the api.
> >> > >
> >> > > 3. It's not very clear to me what status_time in
> ReplicaStatusResponse
> >> > > means.
> >> > >
> >> > > Jun
> >> > >
> >> > >
> >> > >
> >> > > On Fri, Aug 4, 2017 at 10:04 AM, Dong Lin <lindon...@gmail.com>
> >> wrote:
> >> > >
> >> > > > Hey Tom,
> >> > > >
> >> > > > Thanks for your reply. Here are my thoughts:
> >> > > >
> >> > > > 1) I think the DescribeDirsResponse can be used by AdminClient to
> >> query
> >> > > the
> >> > > > lag of follower replica as well. Here is how it works:
> >> > > >
> >> > > > - AdminClient sends DescribeDirsRequest to both the leader and the
> >> > > follower
> >> > > > of the partition.
> >> > > > - DescribeDirsResponse from both leader and follower shows the LEO
> >> of the
> >> > > > leader replica and the follower replica.
> >> > > > - Lag of the follower replica is the difference in the LEO between
> >> leader
> >> > > > and follower.
> >> > > >
> >> > > > In comparison to ReplicaStatusRequest, DescribeDirsRequest needs
> to
> >> be
> >> > > send
> >> > > > to each replica of the partition whereas ReplicaStatusRequest only
> >> needs
> >> > > to
> >> > > > be sent to the leader of the partition. It doesn't seem to make
> much
> >> > > > difference though. In practice we probably want to query the
> >> replica lag
> >> > > of
> >> > > > many partitions and AminClient needs to send exactly one request
> to
> >> each
> >> > > > broker with either solution. Does this make sense?
> >> > > >
> >> > > >
> >> > > > 2) KIP-179 proposes to add the following AdminClient API:
> >> > > >
> >> > > > alterTopics(Collection<AlteredTopic> alteredTopics,
> >> AlterTopicsOptions
> >> > > > options)
> >> > > >
> >> > > > Where AlteredTopic includes the following fields
> >> > > >
> >> > > > AlteredTopic(String name, int numPartitions, int
> replicationFactor,
> >> > > > Map<Integer,List<Integer>> replicasAssignment)
> >> > > >
> >> > > > I have two comments on this:
> >> > > >
> >> > > > - The information in AlteredTopic seems a bit redundant. Both
> >> > > numPartitions
> >> > > > and replicationFactor can be derived from replicasAssignment. I
> >> think we
> >> > > > can probably just use the following API in AdminClient instead:
> >> > > >
> >> > > > AlterTopicsResult alterTopics(Map<TopicPartition, List<Integer>>
> >> > > > partitionAssignment, AlterTopicsOptions options)
> >> > > >
> >> > > > - Do you think "reassignPartitions" may be a better name than
> >> > > > "alterTopics"? This is more consistent with the existing name used
> >> in
> >> > > > kafka-reassign-partitions.sh and I also find it to be more
> >> accurate. On
> >> > > the
> >> > > > other hand, alterTopics seems to suggest that we can also alter
> >> topic
> >> > > > config.
> >> > > >
> >> > > >
> >> > > >
> >> > > > Thanks,
> >> > > > Dong
> >> > > >
> >> > > > On Fri, Aug 4, 2017 at 1:03 AM, Tom Bentley <
> t.j.bent...@gmail.com>
> >> > > wrote:
> >> > > >
> >> > > > > Hi Dong,
> >> > > > >
> >> > > > > Thanks for your reply.
> >> > > > >
> >> > > > > You're right that your DescribeDirsResponse contains appropriate
> >> data.
> >> > > > The
> >> > > > > comment about the log_end_offset in the KIP says "Enable user to
> >> track
> >> > > > > movement progress by comparing LEO of the *.log and *.move".
> That
> >> makes
> >> > > > me
> >> > > > > wonder whether this would only work for replica movement on the
> >> same
> >> > > > > broker. In the case of reassigning partitions between brokers
> >> it's only
> >> > > > > really the leader for the partition that knows the max LEO of
> the
> >> ISR
> >> > > and
> >> > > > > the LEO of the target broker. Maybe the comment is misleading
> and
> >> it
> >> > > > would
> >> > > > > be the leader doing the same thing in both cases. Can you
> clarify
> >> this?
> >> > > >
> >> > > >
> >> > > > > At a conceptual level there's a lot of similarity between
> KIP-113
> >> and
> >> > > > > KIP-179 because they're both about moving replicas around.
> >> Concretely
> >> > > > > though, ChangeReplicaDirRequest assumes the movement is on the
> >> same
> >> > > > broker,
> >> > > > > while the equivalent APIs in KIP-179 (whichever alternative)
> lack
> >> the
> >> > > > > notion of disks. I wonder if a unified API would be better or
> not.
> >> > > > > Operationally, the reasons for changing replicas between brokers
> >> are
> >> > > > likely
> >> > > > > to be motivated by balancing load across the cluster, or
> >> > > adding/removing
> >> > > > > brokers. But the JBOD use case has different motivations. So I
> >> suspect
> >> > > > it's
> >> > > > > OK that the APIs are separate, but I'd love to hear what others
> >> think.
> >> > > >
> >> > > >
> >> > > > > I think you're right about TopicPartitionReplica. At the
> protocol
> >> level
> >> > > > we
> >> > > > > could group topics and partitions together so avoid having the
> >> same
> >> > > topic
> >> > > > > name multiple times when querying for the status of all the
> >> partitions
> >> > > > of a
> >> > > > > topic.
> >> > > > >
> >> > > > > Thanks again for taking the time to respond.
> >> > > > >
> >> > > > > Tom
> >> > > > >
> >> > > > > On 3 August 2017 at 23:07, Dong Lin <lindon...@gmail.com>
> wrote:
> >> > > > >
> >> > > > > > Hey Tom,
> >> > > > > >
> >> > > > > > Thanks for the KIP. It seems that the prior discussion in this
> >> thread
> >> > > > has
> >> > > > > > focused on reassigning partitions (or AlterTopics). I haven't
> >> looked
> >> > > > into
> >> > > > > > this yet. I have two comments on the replicaStatus() API and
> the
> >> > > > > > ReplicaStatusRequest:
> >> > > > > >
> >> > > > > > -  It seems that the use-case for ReplicaStatusRequest is
> >> covered by
> >> > > > > > the DescribeDirsRequest introduced in KIP-113
> >> > > > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> > > > > > 113%3A+Support+replicas+movement+between+log+directories>.
> >> > > > > > DescribeDirsResponse contains the logEndOffset for the
> specified
> >> > > > > > partitions. The the lag of the follower replica can be derived
> >> as the
> >> > > > > > difference between leader's LEO and follower's LEO. Do you
> >> think we
> >> > > can
> >> > > > > > simply use DescribeDirsRequest without introducing
> >> > > > ReplicaStatusRequest?
> >> > > > > >
> >> > > > > > - According to the current KIP, replicaStatus() takes a list
> of
> >> > > > > partitions
> >> > > > > > as input. And its output maps the partition to a list of
> replica
> >> > > > status.
> >> > > > > Do
> >> > > > > > you think it would be better to have a new class called
> >> > > > > > TopicPartitionReplica, which identifies the topic, partition
> >> and the
> >> > > > > > brokerId. replicaStatus can then take a list of
> >> TopicPartitionReplica
> >> > > > as
> >> > > > > > input. And its output maps the replica to replica status. The
> >> latter
> >> > > > API
> >> > > > > > seems simpler and also matches the method name better. What do
> >> you
> >> > > > think?
> >> > > > > >
> >> > > > > > Thanks,
> >> > > > > > Dong
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > > On Wed, Aug 2, 2017 at 5:12 AM, Tom Bentley <
> >> t.j.bent...@gmail.com>
> >> > > > > wrote:
> >> > > > > >
> >> > > > > > > Hi again Ismael,
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > 1. It's worth emphasising that reassigning partitions is a
> >> > > different
> >> > > > > > > >> process than what happens when a topic is created, so not
> >> sure
> >> > > > > trying
> >> > > > > > to
> >> > > > > > > >> make it symmetric is beneficial. In addition to what was
> >> already
> >> > > > > > > >> discussed,
> >> > > > > > > >> one should also enable replication throttling before
> >> moving the
> >> > > > > data.
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > > Thanks for your responses.The only advantage I can see
> from
> >> > > having
> >> > > > > > > > separate APIs for partition count change vs. the other
> >> changes is
> >> > > > > that
> >> > > > > > we
> >> > > > > > > > expect the former to return synchronously, and the latter
> to
> >> > > > > (usually)
> >> > > > > > > > return asynchronously. Lumping them into the same API
> forces
> >> > > > clients
> >> > > > > of
> >> > > > > > > > that API to code for the two possible cases. Is this the
> >> > > particular
> >> > > > > > > concern
> >> > > > > > > > you had in mind, or do you have others?
> >> > > > > > > >
> >> > > > > > >
> >> > > > > > > Just to help see what this looks like I've created another
> >> version
> >> > > of
> >> > > > > > > KIP-179 (
> >> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Copy+
> >> > > > > > > of+KIP-179+-+Change+ReassignPartitionsCommand+to+use+
> >> AdminClient
> >> > > > > > > ).
> >> > > > > > >
> >> > > > > > > The AdminClient APIs are more type safe (harder to misuse).
> >> The
> >> > > > > > > asynchronous nature of the alterReplicationFactors() and
> >> > > > > > > reassignPartitions() methods aren't really apparent (you'd
> >> still
> >> > > have
> >> > > > > to
> >> > > > > > > read the javadocs to know you had to call replicaStatus() to
> >> > > > determine
> >> > > > > > > completion), but it's still better that the lumped-together
> >> API
> >> > > > because
> >> > > > > > the
> >> > > > > > > methods are _consistently_ async. The Protocol APIs are
> >> slightly
> >> > > > nicer
> >> > > > > > too,
> >> > > > > > > in that INVALID_REQUEST is less commonly returned. Another
> >> benefit
> >> > > is
> >> > > > > it
> >> > > > > > > would allow for these different alteration APIs to be
> modified
> >> > > > > > > independently in the future, should that necessary.
> >> > > > > > >
> >> > > > > > > What do others think?
> >> > > > > > >
> >> > > > > > > You're right that the proposal doesn't cover setting and
> >> clearing a
> >> > > > > > > > throttle, and it should. I will study the code about how
> >> this
> >> > > works
> >> > > > > > > before
> >> > > > > > > > posting back about that.
> >> > > > > > > >
> >> > > > > > >
> >> > > > > > > AFAICS from the code the throttle is simply a dynamic config
> >> of the
> >> > > > > > broker.
> >> > > > > > > We already have the AdminClient.alterConfigs API for setting
> >> this,
> >> > > so
> >> > > > > the
> >> > > > > > > refactoring of kafka-reassign-partitions.sh could just use
> >> that
> >> > > > > existing
> >> > > > > > > API. We could still achieve your objective of setting the
> >> throttle
> >> > > > > before
> >> > > > > > > reassignment by making the --throttle option mandatory when
> >> > > --execute
> >> > > > > was
> >> > > > > > > present. Or did you have something else in mind?
> >> > > > > > >
> >> > > > > > > Thanks,
> >> > > > > > >
> >> > > > > > > Tom
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >>
> >
> >
>

Reply via email to