Hi Diego, This is a long time coming and I'm glad to see someone's finally gotten around to filling in this feature gap for Connect.
It looks like this KIP does not take the SinkTask::open and SinkTask::close methods into account ( https://kafka.apache.org/30/javadoc/org/apache/kafka/connect/sink/SinkTask.html#open(java.util.Collection / https://kafka.apache.org/30/javadoc/org/apache/kafka/connect/sink/SinkTask.html#close(java.util.Collection)). Is this intentional? If so, it'd be nice to see a rationale for leaving this out in the rejected alternatives so; if not, I think we may want to add this type of support to the KIP so that we can solve the mutating SMT/asynchronous sink connector problem once and for all, instead of narrowing but not closing the existing feature gap. We may want to take the current effort to add support for cooperative consumer groups ( https://issues.apache.org/jira/browse/KAFKA-12487 / https://github.com/apache/kafka/pull/10563) into account if we opt to add support for open/close, since the current behavior of Connect (which involves invoking SinkTask::close for every topic partition every time a consumer rebalance occurs, then invoking SinkTask::open for all still-assigned partitions) may be easier to reason about, but is likely going to change soon (although it is an option to hold off on that work if this KIP is given priority, which is definitely a valid option). It also looks like we're only exposing the original topic partition to connector developers. I agree with the rationale for not exposing more of the original consumer record for the most part, but what about the record's offset? Although it's not possible to override the Kafka offset for a sink record via the standard SinkRecord::newRecord methods ( https://kafka.apache.org/30/javadoc/org/apache/kafka/connect/sink/SinkRecord.html#newRecord(java.lang.String,java.lang.Integer,org.apache.kafka.connect.data.Schema,java.lang.Object,org.apache.kafka.connect.data.Schema,java.lang.Object,java.lang.Long) / https://kafka.apache.org/30/javadoc/org/apache/kafka/connect/sink/SinkRecord.html#newRecord(java.lang.String,java.lang.Integer,org.apache.kafka.connect.data.Schema,java.lang.Object,org.apache.kafka.connect.data.Schema,java.lang.Object,java.lang.Long,java.lang.Iterable)), there are still public constructors available for the SinkRecord class that can be leveraged by SMTs to return new SinkRecord instances that don't have the same Kafka offset as the one that they've mutated. Do you think it may be worth the additional maintenance burden and API complexity to accommodate this case, with something like a SinkTask::originalKafkaOffset method? I'm also wondering about how exactly this method will be implemented. Will we automatically create a new SinkRecord instance at the end of the transformation chain in order to provide the correct topic partition (and possibly offset)? If so, this should be called out since it means that transformations that return custom subclasses of SinkRecord will no longer be able to do so (or rather, they will still be able to, but these custom subclasses will never be visible to sink tasks). Finally, a small nit: do you think it'd make sense to separate out the newly-proposed SinkTask::originalTopicPartition method into separate SinkTask::originalTopic and SinkTask::originalKafkaPartition methods, to stay in line with the convention that's been loosely set by the existing, separate SinkTask::topic and SinkTask::kafkaPartition methods? I'm personally looking forward to leveraging this improvement in the BigQuery sink connector I help maintain because we recently added a new write mode that uses asynchronous writes and SinkTask::preCommit, but encourage users to use SMTs to redirect records to different datasets/tables in BigQuery, which is currently impossible in that write mode. Thanks for taking this on! Cheers, Chris On Wed, Nov 3, 2021 at 6:17 PM Diego Erdody <erd...@gmail.com> wrote: > Hello, > > I'd like to propose a small KIP to add a new field to SinkRecord in order > to add support for topic-mutating SMTs (e.g. RegexRouter) to asynchronous > Sink Connectors (the ones that override preCommit for internal offset > tracking, like S3 > < > https://github.com/confluentinc/kafka-connect-storage-cloud/blob/master/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkTask.java#L274 > > > ). > > Links: > > - KIP-793: Sink Connectors: Support topic-mutating SMTs for async > connectors (preCommit users) > <https://cwiki.apache.org/confluence/x/fpFnCw> > - PR #11464 <https://github.com/apache/kafka/pull/11464> > > Thanks, > > Diego >