[
https://issues.apache.org/jira/browse/KAFKA-13431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17581826#comment-17581826
]
Yash Mayya commented on KAFKA-13431:
------------------------------------
Hi [~erdody], pinging you here in case you missed my reply on the mailing
thread - [https://lists.apache.org/thread/rzqkm0q5y5v3vdjhg8wqppxbkw7nyopj]
{noformat}
Hi Diego,
Thanks for writing this KIP. Are you still planning to work on this? If not,
I'd be happy to try and take this to completion!
Hi Chris,
Thanks for your valuable inputs as always!
> It looks like this KIP does not take the SinkTask::open and SinkTask::close
> methods into account
> I think we may want to add this type of support to the KIP so that we can
> solve the mutating SMT/asynchronous sink connector problem once and for all
Could you please clarify what you mean here? Do you mean to say that the
partitions passed to the SinkTask::open / SinkTask::close methods should not be
the original topic partitions? If so, how would we be able to change that while
maintaining backward compatibility? Would we want to add new methods to
SinkTask's public API which connectors could choose to implement instead? Also,
currently SinkTask::open is called when partitions are assigned to the task's
consumer in a consumer rebalance; at this point there is no way to know of any
"transformed" topic partitions - that can only be determined after the consumer
polls and records are converted + transformed. Similarly, SinkTask::close is
called when partitions are revoked from the task's consumer in a consumer
rebalance. Are you suggesting we change when SinkTask::open and SinkTask::close
are called (maybe by book-keeping all currently known post-transformation topic
partitions)?
> I agree with the rationale for not exposing more of the original consumer
> record for the most part, but what about the record's offset?
Should we pollute the public API with a method that in all likelihood will
never be used? Maybe I'm lacking imagination here, but why would an SMT ever
want to modify the record's offset? Is there any such SMT currently? It seems
to me that adding such a method may just be unnecessary complexity and a
potential source of confusion to Connect developers, WDYT?
> do you think it'd make sense to separate out the newly-proposed
> SinkTask::originalTopicPartition method into separate SinkTask::originalTopic
> and SinkTask::originalKafkaPartition methods, to stay in line with the
> convention that's been loosely set by the existing, separate SinkTask::topic
> and SinkTask::kafkaPartition methods?
I would tend to agree with this rationale.
Thanks,
Yash{noformat}
cc [~ChrisEgerton] as well
> Sink Connectors: Support topic-mutating SMTs for async connectors (preCommit
> users)
> -----------------------------------------------------------------------------------
>
> Key: KAFKA-13431
> URL: https://issues.apache.org/jira/browse/KAFKA-13431
> Project: Kafka
> Issue Type: Improvement
> Components: KafkaConnect
> Reporter: Diego Erdody
> Assignee: Diego Erdody
> Priority: Major
> Labels: needs-kip
>
> There's currently an incompatibility between Sink connectors overriding the
> {{SinkTask.preCommit}} method (for asynchronous processing) and SMTs that
> mutate the topic field.
> The problem was present since the {{preCommit}} method inception and is
> rooted in a mismatch between the topic/partition that is passed to
> {{open/preCommit}} (the original topic and partition before applying any
> transformations) and the topic partition that is present in the SinkRecord
> that the {{SinkTask.put}} method receives (after transformations are
> applied). Since that's all the information the connector has to implement any
> kind of internal offset tracking, the topic/partitions it can return in
> preCommit will correspond to the transformed topic, when the framework
> actually expects it to be the original topic.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)