[ 
https://issues.apache.org/jira/browse/KAFKA-15113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17736483#comment-17736483
 ] 

Yash Mayya commented on KAFKA-15113:
------------------------------------

Thanks for reviewing the ticket Chris! Yeah, I'm not sure whether it's a 
realistic use case either, but I guess it's possible today to setup a sink 
connector which consumes from a topic on one Kafka cluster but has a DLQ topic 
on another cluster for example? So any change we'd make in this area would 
require a KIP I presume. I do like the idea of making it easier to configure 
common Kafka client override configurations, although I'm not so sure about 
changing the request structure for the create / update connector REST APIs just 
for this use case? It'd also be a little tricky to do the same with the `PUT 
/connectors/\{connector}/config` endpoint while maintaining compatibility.

> Gracefully handle cases where a sink connector's admin and consumer client 
> config overrides target different Kafka clusters
> ---------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-15113
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15113
>             Project: Kafka
>          Issue Type: Task
>          Components: KafkaConnect
>            Reporter: Yash Mayya
>            Priority: Minor
>
> Background reading -
>  * 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy]
>  
>  * 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]
>  
>  
> From [https://github.com/apache/kafka/pull/13434#discussion_r1144415671] -
> {quote}Currently, admin clients are only instantiated for sink connectors to 
> create the DLQ topic if required. So it seems like it could be technically 
> possible for a sink connector's consumer client overrides to target a 
> different Kafka cluster from its producer and admin client overrides. Such a 
> setup won't work with this implementation of the get offsets API as it is 
> using an admin client to get a sink connector's consumer group offsets. 
> However, I'm not sure we want to use a consumer client to retrieve the 
> offsets either as we shouldn't be disrupting the existing sink tasks' 
> consumer group just to fetch offsets. Leveraging a sink task's consumer also 
> isn't an option because fetching offsets for a stopped sink connector (where 
> all the tasks will be stopped) should be allowed. I'm wondering if we should 
> document that a connector's various client config override policies shouldn't 
> target different Kafka clusters (side note - looks like we don't [currently 
> document|https://kafka.apache.org/documentation/#connect] client config 
> overrides for Connect beyond just the worker property 
> {{{}connector.client.config.override.policy{}}}).
> {quote}
>  
> {quote}I don't think we need to worry too much about this. I cannot imagine a 
> sane use case that involves overriding a connector's Kafka clients with 
> different Kafka clusters (not just bootstrap servers, but actually different 
> clusters) for producer/consumer/admin. I'd be fine with adding a note to our 
> docs that that kind of setup isn't supported but I really, really hope that 
> it's not necessary and nobody's trying to do that in the first place. I also 
> suspect that there are other places where this might cause issues, like with 
> exactly-once source support or automatic topic creation for source connectors.
> That said, there is a different case we may want to consider: someone may 
> have configured consumer overrides for a sink connector, but not admin 
> overrides. This may happen if they don't use a DLQ topic. I don't know if we 
> absolutely need to handle this now and we may consider filing a follow-up 
> ticket to look into this, but one quick-and-dirty thought I've had is to 
> configure the admin client used here with a combination of the configurations 
> for the connector's admin client and its consumer, giving precedent to the 
> latter.
> {quote}
>  
> Also from [https://github.com/apache/kafka/pull/13818#discussion_r1224138055] 
> -
> {quote}We will have undesirable behavior if the connector is targeting a 
> Kafka cluster different from the Connect cluster's backing Kafka cluster and 
> the user has configured the consumer overrides appropriately for their 
> connector, but not the admin overrides (something we also discussed 
> previously 
> [here|https://github.com/apache/kafka/pull/13434#discussion_r1144415671]).
> In the above case, if a user attempts to reset their sink connector's offsets 
> via the {{DELETE /connectors/\{connector}/offsets}} endpoint, the following 
> will occur:
>  # We list the consumer group offsets via {{Admin::listConsumerGroupOffsets}} 
> which returns an empty partition offsets map for the sink connector's 
> consumer group ID (it exists on a different Kafka cluster to the one that the 
> admin client is connecting to).
>  # We call {{SinkConnector::alterOffsets}} with an empty offsets map which 
> could cause the sink connector to propagate the offsets reset related changes 
> to the sink system.
>  # We attempt to delete the consumer group via 
> {{Admin::deleteConsumerGroups}} which returns {{GroupIdNotFoundException}} 
> which we essentially swallow in order to keep offsets reset operations 
> idempotent and return a success message to the user (even though the real 
> consumer group for the sink connector on the other Kafka cluster hasn't been 
> deleted).
> This will occur if the connector's admin overrides are missing OR the admin 
> overrides are deliberately configured to target a Kafka cluster different 
> from the consumer overrides (although like you pointed out in the other 
> linked thread, this doesn't seem like a valid use case that we'd even want to 
> support).
> I guess we'd want to pursue the approach you suggested where we'd configure 
> the admin client with a combination of the connector's admin overrides and 
> consumer overrides?
> Another option could potentially be to somehow verify that the 
> {{admin.override.bootstrap.servers}} in the connector's config / 
> {{admin.bootstrap.servers}} in the worker config / {{bootstrap.servers}} in 
> the worker config (in order of preference) correspond to the same Kafka 
> cluster as {{consumer.override.bootstrap.servers}} in the connector's config 
> / {{consumer.bootstrap.servers}} in the worker config / {{bootstrap.servers}} 
> in the worker config (in order of preference) and fail the request if we are 
> able to reliably determine that they aren't pointing to the same Kafka 
> clusters? I'm not sure that this is a feasible approach however.
> Yet another option could be to remove the idempotency guarantee from the 
> {{DELETE /connectors/\{connector}/offsets}} endpoint and if we encounter a 
> {{GroupIdNotFoundException}} from {{{}Admin::deleteConsumerGroups{}}}, return 
> an error message to the user indicating that either the offsets have already 
> been reset previously or else they might need to check their connector's 
> admin overrides (this does seem fairly ugly though).
> Edit: A more elegant way might be to switch the offset reset mechanism from 
> deleting the consumer group to deleting the offsets for all topic partitions 
> via {{Admin::deleteConsumerGroupOffsets}} (similar to what we do for the 
> {{PATCH /connectors/\{connector}/offsets}} endpoint when the offset for a 
> partition is specified as {{{}null{}}}). This way we could explicitly check 
> for existence of the sink connector's consumer group prior to listing its 
> offsets and fail requests if the consumer group doesn't exist (the minor 
> down-side is that this will require an additional admin client request).
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to