Hi Diego,

This is a long time coming and I'm glad to see someone's finally gotten
around to filling in this feature gap for Connect.

It looks like this KIP does not take the SinkTask::open and SinkTask::close
methods into account (
https://kafka.apache.org/30/javadoc/org/apache/kafka/connect/sink/SinkTask.html#open(java.util.Collection
/
https://kafka.apache.org/30/javadoc/org/apache/kafka/connect/sink/SinkTask.html#close(java.util.Collection)).
Is this intentional? If so, it'd be nice to see a rationale for leaving
this out in the rejected alternatives so; if not, I think we may want to
add this type of support to the KIP so that we can solve the mutating
SMT/asynchronous sink connector problem once and for all, instead of
narrowing but not closing the existing feature gap. We may want to take the
current effort to add support for cooperative consumer groups (
https://issues.apache.org/jira/browse/KAFKA-12487 /
https://github.com/apache/kafka/pull/10563) into account if we opt to add
support for open/close, since the current behavior of Connect (which
involves invoking SinkTask::close for every topic partition every time a
consumer rebalance occurs, then invoking SinkTask::open for all
still-assigned partitions) may be easier to reason about, but is likely
going to change soon (although it is an option to hold off on that work if
this KIP is given priority, which is definitely a valid option).

It also looks like we're only exposing the original topic partition to
connector developers. I agree with the rationale for not exposing more of
the original consumer record for the most part, but what about the record's
offset? Although it's not possible to override the Kafka offset for a sink
record via the standard SinkRecord::newRecord methods (
https://kafka.apache.org/30/javadoc/org/apache/kafka/connect/sink/SinkRecord.html#newRecord(java.lang.String,java.lang.Integer,org.apache.kafka.connect.data.Schema,java.lang.Object,org.apache.kafka.connect.data.Schema,java.lang.Object,java.lang.Long)
/
https://kafka.apache.org/30/javadoc/org/apache/kafka/connect/sink/SinkRecord.html#newRecord(java.lang.String,java.lang.Integer,org.apache.kafka.connect.data.Schema,java.lang.Object,org.apache.kafka.connect.data.Schema,java.lang.Object,java.lang.Long,java.lang.Iterable)),
there are still public constructors available for the SinkRecord class that
can be leveraged by SMTs to return new SinkRecord instances that don't have
the same Kafka offset as the one that they've mutated. Do you think it may
be worth the additional maintenance burden and API complexity to
accommodate this case, with something like a SinkTask::originalKafkaOffset
method?

I'm also wondering about how exactly this method will be implemented. Will
we automatically create a new SinkRecord instance at the end of the
transformation chain in order to provide the correct topic partition (and
possibly offset)? If so, this should be called out since it means that
transformations that return custom subclasses of SinkRecord will no longer
be able to do so (or rather, they will still be able to, but these custom
subclasses will never be visible to sink tasks).

Finally, a small nit: do you think it'd make sense to separate out the
newly-proposed SinkTask::originalTopicPartition method into separate
SinkTask::originalTopic and SinkTask::originalKafkaPartition methods, to
stay in line with the convention that's been loosely set by the existing,
separate SinkTask::topic and SinkTask::kafkaPartition methods?

I'm personally looking forward to leveraging this improvement in the
BigQuery sink connector I help maintain because we recently added a new
write mode that uses asynchronous writes and SinkTask::preCommit, but
encourage users to use SMTs to redirect records to different
datasets/tables in BigQuery, which is currently impossible in that write
mode. Thanks for taking this on!

Cheers,

Chris

On Wed, Nov 3, 2021 at 6:17 PM Diego Erdody <erd...@gmail.com> wrote:

> Hello,
>
> I'd like to propose a small KIP to add a new field to SinkRecord in order
> to add support for topic-mutating SMTs (e.g. RegexRouter) to asynchronous
> Sink Connectors (the ones that override preCommit for internal offset
> tracking, like S3
> <
> https://github.com/confluentinc/kafka-connect-storage-cloud/blob/master/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkTask.java#L274
> >
> ).
>
> Links:
>
> - KIP-793: Sink Connectors: Support topic-mutating SMTs for async
> connectors (preCommit users)
> <https://cwiki.apache.org/confluence/x/fpFnCw>
> - PR #11464 <https://github.com/apache/kafka/pull/11464>
>
> Thanks,
>
> Diego
>

Reply via email to