[ https://issues.apache.org/jira/browse/KAFKA-15113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17736483#comment-17736483 ]
Yash Mayya commented on KAFKA-15113: ------------------------------------ Thanks for reviewing the ticket Chris! Yeah, I'm not sure whether it's a realistic use case either, but I guess it's possible today to setup a sink connector which consumes from a topic on one Kafka cluster but has a DLQ topic on another cluster for example? So any change we'd make in this area would require a KIP I presume. I do like the idea of making it easier to configure common Kafka client override configurations, although I'm not so sure about changing the request structure for the create / update connector REST APIs just for this use case? It'd also be a little tricky to do the same with the `PUT /connectors/\{connector}/config` endpoint while maintaining compatibility. > Gracefully handle cases where a sink connector's admin and consumer client > config overrides target different Kafka clusters > --------------------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-15113 > URL: https://issues.apache.org/jira/browse/KAFKA-15113 > Project: Kafka > Issue Type: Task > Components: KafkaConnect > Reporter: Yash Mayya > Priority: Minor > > Background reading - > * > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy] > > * > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect] > > > From [https://github.com/apache/kafka/pull/13434#discussion_r1144415671] - > {quote}Currently, admin clients are only instantiated for sink connectors to > create the DLQ topic if required. So it seems like it could be technically > possible for a sink connector's consumer client overrides to target a > different Kafka cluster from its producer and admin client overrides. Such a > setup won't work with this implementation of the get offsets API as it is > using an admin client to get a sink connector's consumer group offsets. > However, I'm not sure we want to use a consumer client to retrieve the > offsets either as we shouldn't be disrupting the existing sink tasks' > consumer group just to fetch offsets. Leveraging a sink task's consumer also > isn't an option because fetching offsets for a stopped sink connector (where > all the tasks will be stopped) should be allowed. I'm wondering if we should > document that a connector's various client config override policies shouldn't > target different Kafka clusters (side note - looks like we don't [currently > document|https://kafka.apache.org/documentation/#connect] client config > overrides for Connect beyond just the worker property > {{{}connector.client.config.override.policy{}}}). > {quote} > > {quote}I don't think we need to worry too much about this. I cannot imagine a > sane use case that involves overriding a connector's Kafka clients with > different Kafka clusters (not just bootstrap servers, but actually different > clusters) for producer/consumer/admin. I'd be fine with adding a note to our > docs that that kind of setup isn't supported but I really, really hope that > it's not necessary and nobody's trying to do that in the first place. I also > suspect that there are other places where this might cause issues, like with > exactly-once source support or automatic topic creation for source connectors. > That said, there is a different case we may want to consider: someone may > have configured consumer overrides for a sink connector, but not admin > overrides. This may happen if they don't use a DLQ topic. I don't know if we > absolutely need to handle this now and we may consider filing a follow-up > ticket to look into this, but one quick-and-dirty thought I've had is to > configure the admin client used here with a combination of the configurations > for the connector's admin client and its consumer, giving precedent to the > latter. > {quote} > > Also from [https://github.com/apache/kafka/pull/13818#discussion_r1224138055] > - > {quote}We will have undesirable behavior if the connector is targeting a > Kafka cluster different from the Connect cluster's backing Kafka cluster and > the user has configured the consumer overrides appropriately for their > connector, but not the admin overrides (something we also discussed > previously > [here|https://github.com/apache/kafka/pull/13434#discussion_r1144415671]). > In the above case, if a user attempts to reset their sink connector's offsets > via the {{DELETE /connectors/\{connector}/offsets}} endpoint, the following > will occur: > # We list the consumer group offsets via {{Admin::listConsumerGroupOffsets}} > which returns an empty partition offsets map for the sink connector's > consumer group ID (it exists on a different Kafka cluster to the one that the > admin client is connecting to). > # We call {{SinkConnector::alterOffsets}} with an empty offsets map which > could cause the sink connector to propagate the offsets reset related changes > to the sink system. > # We attempt to delete the consumer group via > {{Admin::deleteConsumerGroups}} which returns {{GroupIdNotFoundException}} > which we essentially swallow in order to keep offsets reset operations > idempotent and return a success message to the user (even though the real > consumer group for the sink connector on the other Kafka cluster hasn't been > deleted). > This will occur if the connector's admin overrides are missing OR the admin > overrides are deliberately configured to target a Kafka cluster different > from the consumer overrides (although like you pointed out in the other > linked thread, this doesn't seem like a valid use case that we'd even want to > support). > I guess we'd want to pursue the approach you suggested where we'd configure > the admin client with a combination of the connector's admin overrides and > consumer overrides? > Another option could potentially be to somehow verify that the > {{admin.override.bootstrap.servers}} in the connector's config / > {{admin.bootstrap.servers}} in the worker config / {{bootstrap.servers}} in > the worker config (in order of preference) correspond to the same Kafka > cluster as {{consumer.override.bootstrap.servers}} in the connector's config > / {{consumer.bootstrap.servers}} in the worker config / {{bootstrap.servers}} > in the worker config (in order of preference) and fail the request if we are > able to reliably determine that they aren't pointing to the same Kafka > clusters? I'm not sure that this is a feasible approach however. > Yet another option could be to remove the idempotency guarantee from the > {{DELETE /connectors/\{connector}/offsets}} endpoint and if we encounter a > {{GroupIdNotFoundException}} from {{{}Admin::deleteConsumerGroups{}}}, return > an error message to the user indicating that either the offsets have already > been reset previously or else they might need to check their connector's > admin overrides (this does seem fairly ugly though). > Edit: A more elegant way might be to switch the offset reset mechanism from > deleting the consumer group to deleting the offsets for all topic partitions > via {{Admin::deleteConsumerGroupOffsets}} (similar to what we do for the > {{PATCH /connectors/\{connector}/offsets}} endpoint when the offset for a > partition is specified as {{{}null{}}}). This way we could explicitly check > for existence of the sink connector's consumer group prior to listing its > offsets and fail requests if the consumer group doesn't exist (the minor > down-side is that this will require an additional admin client request). > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)