Hi there,

I'm working in a java service that allows managing consumer group's
members, specially to pause/resume them on demand. After checking here and
there it seems that the actual AdminClient makes this feasible through
removeMembersFromConsumerGroup method [
https://kafka.apache.org/38/javadoc/org/apache/kafka/clients/admin/KafkaAdminClient.html#removeMembersFromConsumerGroup(java.lang.String,org.apache.kafka.clients.admin.RemoveMembersFromConsumerGroupOptions)].
The point is that trying to implement the solution, found that somehow the
client was returning an error saying something like "unknown member is not
found". Jumped to the actual sources to find out the reasons and found that
actually the error was accurate, cause the MemberToRemove class is using a
static value "JoinGroupRequest.UNKNOWN_MEMBER_ID" for the memberId field
that is being used in the member deletion logic, so it is expected that
kafka service is responding such thing.

As a PoC, I just updated the code to be able to set a given memberId value
(extracted from the describeConsumerGroups method) and it just worked as
expected. I created a PR https://github.com/apache/kafka/pull/18738 as a
quick explanation to apply to ASF Jira. In that PR I was gently asked to
start the conversation through this mailing list, so here we are.

I figured out that to make sure that members are not connecting again to
the consumer group, we'd require the listener (we're using spring-kafka) to
stop trying. Guess it could be done by pausing it from the service code
itself, by watching the reason the disconnection happened (this thing we
pass on removeMembersFromConsumerGroup). But everything lies on being able
to drop the members by setting the proper value as memberId from the admin
client.

Do you think it makes sense to be able to handle the members like that? Are
there any security or kafka-wise service malfunction implications in
handling them that way?

Thanks in advance,

Reply via email to