Hi Andrew,

Thanks for your comments.

Based on my understanding from
https://kafka.apache.org/25/javadoc/org/apache/kafka/connect/sink/SinkTask.html,
put(...) can be asynchronous but not necessarily should be, specifically
in: "As records are fetched from Kafka, they will be passed to the sink
task using the put(Collection) API, which should either write them to the
downstream system or batch them for later writing. Periodically, Connect
will call flush(Map) to ensure that batched records are actually pushed to
the downstream system."

With respect to sending errant records within put(...) being too
restrictive, as Randall pointed out, this was more of an attempt at
describing the basic behavior rather than requiring the reporter only be
called within put(...).

I see your point about the previous pattern of adding methods that the task
uses to SinkTaskContext. The main concerns I have with this procedure are:

1. The added effort for a connector developer that wants to use this
functionality in checking if this method exists and implementing this
across all connectors
2. Since the addition of that method two years ago, the Connect ecosystem
has grown a lot which causes other compatibility issues and inertia towards
upgrading their version of Connect

Please let me know what you think.

Thanks,
Aakash

On Mon, May 11, 2020 at 2:49 PM Andrew Schofield <andrew_schofi...@live.com>
wrote:

> Hi Aakash,
> Thanks for sorting out the replies to the mailing list.
>
> First, I do like the idea of improving error reporting in sink connectors.
> I'd like a simple
> way to put bad records onto the DLQ.
>
> I think this KIP is considerably more complicated than it seems. The
> guidance on the
> SinkTask.put() method is that it should send the records asynchronously
> and immediately
> return, so the task is likely to want to report errors asynchronously
> too.  Currently the KIP
> states that "the task can send errant records to it within put(...)" and
> that's too restrictive.
> The task ought to be able to report any unflushed records, but the
> synchronisation of this is going
> to be tricky. I suppose the connector author needs to make sure that all
> errant records have
> been reported before returning control from SinkTask.flush(...) or perhaps
> SinkTask.preCommit(...).
>
> I think the interface is a little strange too. I can see that this was
> done so it's possible to deliver a connector
> that supports error reporting but it can also work in earlier versions of
> the KC runtime. But, the
> pattern so far is that the task uses the methods of SinkTaskContext to
> access utilities in the Kafka
> Connect runtime, and I suggest that reporting a bad record is such a
> utility. SinkTaskContext has
> changed before when the configs() methods was added, so I think there is
> precedent for adding a method.
> The way the KIP adds a method to SinkTask that the KC runtime calls to
> provide the error reporting utility
> seems not to match what has gone before.
>
> Thanks,
> Andrew
>
> On 11/05/2020, 19:05, "Aakash Shah" <as...@confluent.io> wrote:
>
>     I wasn't previously added to the dev mailing list, so I'd like to post
> my
>     discussion with Andrew Schofield below for visibility and further
>     discussion:
>
>     Hi Andrew,
>
>     Thanks for the reply. The main concern with this approach would be its
>     backward compatibility. I’ve highlighted the thoughts around the
> backwards
>     compatibility of the initial approach, please let me know what you
> think.
>
>     Thanks,
>     Aakash
>
>
> ____________________________________________________________________________________________________________________________
>
>     Hi,
>     By adding a new method to the SinkContext interface in say Kafka 2.6, a
>     connector that calls it would require a Kafka 2.6 connect runtime. I
> don't
>     quite see how that's a backward compatibility problem. It's just that
> new
>     connectors need the latest interface. I might not quite be
> understanding,
>     but I think it would be fine.
>
>     Thanks,
>     Andrew
>
>
> ____________________________________________________________________________________________________________________________
>
>     Hi Andrew,
>
>     I apologize for the way the reply was sent. I just subscribed to the
> dev
>     mailing list so it should be resolved now.
>
>     You are correct, new connectors would simply require the latest
> interface.
>     However, we want to remove that requirement - in other words, we want
> to
>     allow the possibility that someone wants the latest connector/to
> upgrade to
>     the latest version, but deploys it on an older version of AK.
> Basically, we
>     don't want to enforce the necessity of upgrading AK to get the latest
>     interface. In the current approach, there would be no issue of
> deploying a
>     new connector on an older version of AK, as the Connect framework would
>     simply not invoke the new method.
>
>     Please let me know what you think and if I need to clarify anything.
>
>     Thanks,
>     Aakash
>
>

Reply via email to