Andrew/Jose, I like the suggested Flow API. It's also similar to the stream observers in GPRC. I'm not sure we should expose something as complex as the Flow API directly in KafkaAdminClient, but certainly we can provide a similar interface.
--- Cancellations: Another thing not yet discussed is how to cancel in-flight requests. For other calls in KafkaAdminClient, we use KafkaFuture which has a "cancel" method. With the callback approach, we need to be able to cancel the request from within the callback as well as externally. Looking to the Flow API again for inspiration, we could have the admin client pass an object to the callback which can be used for cancellation. In the simple case, users can ignore this object. In the advanced case, they can create a concrete class for the callback and cache the cancellation object so it can be accessed externally. This would be similar to the Subscription in the Flow API. --- Topics / Partitions: For the case of topic descriptions, we actually have two data types interleaved in one stream (topics and partitions). This means if we go with TopicDescription in the "onNext" method, we will have a partial set of topics in some cases. Also, we will end up calling "onNext" more than once for each RPC in the case that a single RPC response spans multiple topics. One alternative to a single "onNext" would be an interface more tailored to the RPC like: interface DescribeTopicsStreamObserver { // Called for each topic in the result stream. void onTopic(TopicInfo topic); // Called for each partition of the topic last handled by onTopic void onPartition(TopicPartitionInfo partition); // Called once the broker has finished streaming results to the admin client. This marks the end of the stream. void onComplete(); // Called if an error occurs on the underlying stream. This marks the end of the stream. void onError(Throwable t); } --- Consumer API: Offline, there was some discussion about using a simple SAM consumer-like interface: interface AdminResultsConsumer<T> { void onNext(T next, Throwable t); } This has the benefit of being quite simple and letting callers supply a lambda instead of a full anonymous class definition. This would use nullable arguments like CompletableFuture#whenComplete. We could also use an Optional pattern here instead of nullables. --- Summary: So far, it seems like we are looking at these different options. The main difference in terms of API design is if the user will need to implement more than one method, or if a lambda can suffice. 1. Generic, Flow-like interface: AdminResultsSubscriber 2. DescribeTopicsStreamObserver (in this message above) 3. AdminResultsConsumer 4. AdminResultsConsumer with an Optional-like type instead of nullable arguments -David On Fri, Feb 23, 2024 at 4:00 PM José Armando García Sancio <jsan...@confluent.io.invalid> wrote: > Hi Calvin > > On Fri, Feb 23, 2024 at 9:23 AM Calvin Liu <ca...@confluent.io.invalid> > wrote: > > As we agreed to implement the pagination for the new API > > DescribeTopicPartitions, the client side must also add a proper interface > > to handle the pagination. > > The current KafkaAdminClient.describeTopics returns > > the DescribeTopicsResult which is the future for querying all the topics. > > It is awkward to fit the pagination into it because > > I suggest taking a look at Java's Flow API: > > https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/util/concurrent/Flow.html > It was design for this specific use case and many libraries integrate with > it. > > If the Kafka client cannot be upgraded to support the Java 9 which > introduced that API, you can copy the same interface and semantics. > This would allow users to easily integrate with reactive libraries > since they all integrate with Java Flow. > > Thanks, > -- > -José > -- -David