In the specify group information, can we also return information like partition assignment for each member, the lag/offset of each member/partition? It would be useful for Ops/Admin regarding the health of the consumer group.
Regards, -- Jianbin > On Feb 6, 2017, at 13:54, Guozhang Wang <wangg...@gmail.com> wrote: > > Some follow-up on 2) / 3) below. > > On Mon, Feb 6, 2017 at 11:21 AM, Colin McCabe <cmcc...@apache.org > <mailto:cmcc...@apache.org>> wrote: > >> On Fri, Feb 3, 2017, at 16:25, Guozhang Wang wrote: >>> Thanks for the proposal Colin. A few comments below: >> >> Thanks for taking a look, Guozhang. >> >>> >>> 1. There are a couple of classes that looks new to me but not defined >>> anywhere. For example: NewTopic (topic name and configs?), TopicInfo (is >>> this a wrapper of MetadataResponse.TopicMetadata?), NodeApiVersions, >>> GroupOverview. >>> Could you provide their class definitions? >> >> Good point. I will add them in the KIP. >> >> NodeApiVersions is at >> ./clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java >> >>> >>> 2. In Streams, we would like to replace its own ` >>> org.apache.kafka.streams.processor.internals.StreamsKafkaClient` class >>> with >>> this new admin client. One additional request though, is that for create >>> / >>> delete topics, we'd like to use a different "flag" as BLOCKING, which >>> means >>> the response will not be sent back until the controller has updated its >>> own >>> metadata cache for the topic, and even STRICT_BLOCKING, which means the >>> response will not be sent back until the metadata has been propagated to >>> the whole cluster. >> >> Hmm. It seems like this would require additional RPCs or changes to >> existing RPCs on the server. So we should handle this in a follow-on >> KIP, I think. >> >> > I agree for STRICT_BLOCKING, while for BLOCKING, it is already supported as > of today I think, and Streams' KafkaClient is using that mechanism as well. > > >>> >>> 3. I'm wondering what's the usage of "public Map<Node, >>> Try<List<GroupOverview>>> getAllGroups()", or rather, would it be more >>> useful to get a specific group information given the group id? Otherwise >>> we >>> need to query each one of the coordinator. >> >> That's a good point. We should have an API that gets information about >> a specific group, by querying only the coordinator for that group. By >> the way, what specific group information should we expose, besides name >> and protocolType? >> >> > I think these can all be returned? > > (groupID, protocolType, generationID, state, members: [memberID, > clientHost], leaderID (nullable) ) > > >>> >>> 4. I'm +1 with Ismael's suggestion for having the AdminClient interface >>> with a KafkaAdminClient impl, this at least allows easier mocks for unit >>> testing. >> >> Yeah, I agree. Hopefully that will also make it a little clearer what >> the boundary is between the internal functions and classes and the >> public API. I'll update the KIP accordingly. >> >> thanks, >> Colin >> >>> >>> Guozhang >>> >>> >>> >>> On Fri, Feb 3, 2017 at 10:40 AM, Colin McCabe <cmcc...@apache.org> >> wrote: >>> >>>> On Thu, Feb 2, 2017, at 15:02, Ismael Juma wrote: >>>>> Hi Colin, >>>>> >>>>> Thanks for the KIP, great to make progress on this. I have some >> initial >>>>> comments, will post more later: >>>>> >>>>> 1. We have KafkaProducer that implements the Producer interface and >>>>> KafkaConsumer that implements the Consumer interface. Maybe we could >> have >>>>> KafkaAdminClient that implements the AdminClient interface? Or maybe >> just >>>>> KafkaAdmin. Not sure, some ideas for consideration. Also, I don't >> think >>>>> we >>>>> should worry about a name clash with the internal AdminClient >> written in >>>>> Scala. That will go away soon enough and choosing a good name for the >>>>> public class is what matters. >>>> >>>> Hi Ismael, >>>> >>>> Thanks for taking a look. >>>> >>>> I guess my thought process was that users might find it confusing if >> the >>>> public API and the old private API had the same name. "What do you >>>> mean, I have to upgrade to release X to get AdminClient, I have it >> right >>>> here?" I do have a slight preference for the shorter name, though, so >>>> if this isn't a worry, we can change it to AdminClient. >>>> >>>>> >>>>> 2. We should include the proposed package name in the KIP >>>>> (probably org.apache.kafka.clients.admin?). >>>> >>>> Good idea. I will add the package name to the KIP. >>>> >>>>> >>>>> 3. It would be good to list the supported configs. >>>> >>>> OK >>>> >>>>> >>>>> 4. KIP-107, which passed the vote, specifies the introduction of a >> method >>>>> to AdminClient with the following signature. We should figure out >> how it >>>>> would look given this proposal. >>>>> >>>>> Future<Map<TopicPartition, PurgeDataResult>> >>>>> purgeDataBefore(Map<TopicPartition, Long> offsetForPartition) >>>>> >>>>> 5. I am not sure about rejecting the Futures-based API. I think I >> would >>>>> prefer that, personally. Grant had an interesting idea of not >> exposing >>>>> the >>>>> batch methods in the AdminClient to start with to keep it simple and >>>>> relying on a Future based API to make it easy for users to run things >>>>> concurrently. This is consistent with the producer... >>>> >>>> So, there are two ways that an operation can be "async" here which are >>>> very separate. >>>> >>>> There is "async on the server." This basically means that we tell the >>>> server to do something and don't wait for a confirmation that it >>>> succeeded. For example, in the current proposal, users can call >>>> createTopic(new Topic(...), CreateTopicFlags.NONBLOCKING). The call >>>> will wait for the server to get the request, which will go into >>>> purgatory. Later on, the request may succeed or fail, but the client >>>> won't know either way. In RPC terms, this means we set the timeout >>>> value to 0. >>>> >>>> Then there is "async on the client." This just means that the client >>>> thread doesn't block-- instead, it gets back a Future (or similar >>>> object). What this boils down to in terms of implementation is that a >>>> message gets put on some queue somewhere and the client thread >> continues >>>> running. >>>> >>>> "async on the client" tends to be good when you want to churn out a ton >>>> of requests without using lots of threads. However, it is more >>>> confusing mental model for most programmers. >>>> >>>> You can easily translate a Futures-based API into a blocking API by >>>> having blocking shims that just call create the Future and call get(). >>>> Similarly, you can transform a blocking API into a Futures-based API by >>>> using a thread pool. Thread pools use resources, though, whereas >> having >>>> function shims does not. >>>> >>>> I haven't seen any discussion here about what we gain here by using a >>>> Futures-based API. It makes sense to use Futures in the Producer, >> since >>>> they're more flexible, and users are potentially creating lots and lots >>>> of messages. I'm not sure if users would want to do lots and lots of >>>> admin operations with a single thread. I'd be curious to hear a little >>>> more from potential end-users about what API would be most flexible and >>>> usable for them. I'm open to ideas. >>>> >>>> best, >>>> Colin >>>> >>>>> >>>>> Ismael >>>>> >>>>> On Thu, Feb 2, 2017 at 6:54 PM, Colin McCabe <cmcc...@apache.org> >> wrote: >>>>> >>>>>> Hi all, >>>>>> >>>>>> I wrote up a Kafka improvement proposal for adding an >>>>>> AdministrativeClient interface. This is a continuation of the >> work on >>>>>> KIP-4 towards centralized administrative operations. Please check >> out >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>> 117%3A+Add+a+public+ >>>>>> AdministrativeClient+API+for+Kafka+admin+operations >>>>>> >>>>>> regards, >>>>>> Colin >>>>>> >>>> >>> >>> >>> >>> -- >>> -- Guozhang >> > > > > -- > -- Guozhang