Hi Randall,

Thanks for the feedback.

1. This is a great suggestion, but I find that adding an overloaded
put(...) which essentially deprecates the old put(...) to only be used when
a connector is deployed on older versions of Connect adds enough of a
complication that could cause connectors to break if the old put(...)
doesn't correctly invoke the overloaded put(...); either that, or it will
add duplication of functionality across the two put(...) methods. I think
the older method simplifies things with the idea that a DLQ/error reporter
will or will not be passed into the method depending on the version of AK.
However, I also understand the aesthetic advantage of this method vs the
setter method, so I am okay with going in this direction if others agree
with adding the overloaded put(...).

2. Yes, your assumption is correct. Yes, we can remove the "Order of
Operations" if we go with the overloaded put(...) direction.

3. Great point, I will remove them from the KIP.

4. Yeah, accept(...) will be synchronous. I will change it to be clearer,
thanks.

5. This KIP will use existing metrics as well introduce new metrics. I will
update this section to fully specify the metrics.

Please let me know what you think.

Thanks,
Aakash

On Thu, May 14, 2020 at 3:52 PM Randall Hauch <rha...@gmail.com> wrote:

> Hi, Aakash.
>
> Thanks for the KIP. Connect does need an improved ability for sink
> connectors to report individual records as being problematic, and this
> integrates nicely with the existing DLQ feature.
>
> I also appreciate the desire to maintain compatibility so that connectors
> can take advantage of this feature when deployed in a runtime that supports
> this feature, but can safely and easily do without the feature when
> deployed to an older runtime. But I do understand Andrew's concern about
> the aesthetics. Have you considered overloading the `put(...)` method and
> adding the `reporter` as a second parameter? Essentially it would add the
> one method (with proper JavaDoc) to `SinkTask` only:
>
> ```
>     public void put(Collection<SinkRecord> records, BiFunction<SinkRecord,
> Throwable> reporter) {
>         put(records);
>     }
> ```
> and the WorkerSinkTask would be changed to call `put(Collection,
> BiFunction)` instead.
>
> Sink connector implementations that don't do anything different can still
> override `put(Collection)`, and it still works as before. Developers that
> want to change their sink connector implementations to support this new
> feature would do the following, which would work in older and newer Connect
> runtimes:
> ```
>     public void put(Collection<SinkRecord> records) {
>         put(records, null);
>     }
>     public void put(Collection<SinkRecord> records, BiFunction<SinkRecord,
> Throwable> reporter) {
>         // the normal `put(Collection)` logic goes here, but can optionally
> use `reporter` if non-null
>     }
> ```
>
> I think this has all the same benefits of the current KIP, but
> it's noticeably simpler and hopefully more aesthetically pleasing.
>
> As for Andrew's second concern about "the task can send errant records to
> it within put(...)" being too restrictive. My guess is that this was more
> an attempt at describing the basic behavior, and less about requiring the
> reporter only being called within the `put(...)` method and not by methods
> to which `put(...)` synchronously or asynchronously delegates. Can you
> confirm whether my assumption is correct? If so, then perhaps my suggestion
> helps work around this issue as well, since there would be no restriction
> on when the reporter is called, and the whole "Order of Operations" section
> could potentially be removed.
>
> Third, it's not clear to me why the "Error Reporter Object" subsection in
> the "Proposal" section lists the worker configuration properties that were
> previously introduced with
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect
> .
> Maybe it's worth mentioning that the error reporter functionality will
> reuse or build upon KIP-298, including reusing the configuration properties
> defined in KIP-298. But IIUC, the KIP does not propose changing any
> technical or semantic aspect of these configuration properties, and
> therefore the KIP would be more clear and succinct without them. *That* the
> error reporter will use these properties is part of the UX and therefore
> necessary to mention, but *how* it uses those properties is really up to
> the implementation.
>
> Fourth, the "Synchrony" section has a sentence that is confusing, or not as
> clear as it could be.
>
>     "If a record is sent to the error reporter, processing of the next
> errant record in accept(...) will not begin until the producer successfully
> sends the errant record to Kafka."
>
> This sentence is a bit difficult to understand, but IIUC this really just
> means that "accept(...)" will be synchronous and will block until the
> errant record has been successfully written to Kafka. If so, let's say
> that. The rest of the paragraph is fine.
>
> Finally, is this KIP proposing new metrics, or that existing metrics would
> be used to track the error reporter usage? If the former, then please
> fully-specify what these metrics will be, similarly to how metrics are
> specified in
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-196%3A+Add+metrics+to+Kafka+Connect+framework
> .
>
> Thoughts?
>
> Best regards,
>
> Randall
>
> On Mon, May 11, 2020 at 4:49 PM Andrew Schofield <
> andrew_schofi...@live.com>
> wrote:
>
> > Hi Aakash,
> > Thanks for sorting out the replies to the mailing list.
> >
> > First, I do like the idea of improving error reporting in sink
> connectors.
> > I'd like a simple
> > way to put bad records onto the DLQ.
> >
> > I think this KIP is considerably more complicated than it seems. The
> > guidance on the
> > SinkTask.put() method is that it should send the records asynchronously
> > and immediately
> > return, so the task is likely to want to report errors asynchronously
> > too.  Currently the KIP
> > states that "the task can send errant records to it within put(...)" and
> > that's too restrictive.
> > The task ought to be able to report any unflushed records, but the
> > synchronisation of this is going
> > to be tricky. I suppose the connector author needs to make sure that all
> > errant records have
> > been reported before returning control from SinkTask.flush(...) or
> perhaps
> > SinkTask.preCommit(...).
> >
> > I think the interface is a little strange too. I can see that this was
> > done so it's possible to deliver a connector
> > that supports error reporting but it can also work in earlier versions of
> > the KC runtime. But, the
> > pattern so far is that the task uses the methods of SinkTaskContext to
> > access utilities in the Kafka
> > Connect runtime, and I suggest that reporting a bad record is such a
> > utility. SinkTaskContext has
> > changed before when the configs() methods was added, so I think there is
> > precedent for adding a method.
> > The way the KIP adds a method to SinkTask that the KC runtime calls to
> > provide the error reporting utility
> > seems not to match what has gone before.
> >
> > Thanks,
> > Andrew
> >
> > On 11/05/2020, 19:05, "Aakash Shah" <as...@confluent.io> wrote:
> >
> >     I wasn't previously added to the dev mailing list, so I'd like to
> post
> > my
> >     discussion with Andrew Schofield below for visibility and further
> >     discussion:
> >
> >     Hi Andrew,
> >
> >     Thanks for the reply. The main concern with this approach would be
> its
> >     backward compatibility. I’ve highlighted the thoughts around the
> > backwards
> >     compatibility of the initial approach, please let me know what you
> > think.
> >
> >     Thanks,
> >     Aakash
> >
> >
> >
> ____________________________________________________________________________________________________________________________
> >
> >     Hi,
> >     By adding a new method to the SinkContext interface in say Kafka
> 2.6, a
> >     connector that calls it would require a Kafka 2.6 connect runtime. I
> > don't
> >     quite see how that's a backward compatibility problem. It's just that
> > new
> >     connectors need the latest interface. I might not quite be
> > understanding,
> >     but I think it would be fine.
> >
> >     Thanks,
> >     Andrew
> >
> >
> >
> ____________________________________________________________________________________________________________________________
> >
> >     Hi Andrew,
> >
> >     I apologize for the way the reply was sent. I just subscribed to the
> > dev
> >     mailing list so it should be resolved now.
> >
> >     You are correct, new connectors would simply require the latest
> > interface.
> >     However, we want to remove that requirement - in other words, we want
> > to
> >     allow the possibility that someone wants the latest connector/to
> > upgrade to
> >     the latest version, but deploys it on an older version of AK.
> > Basically, we
> >     don't want to enforce the necessity of upgrading AK to get the latest
> >     interface. In the current approach, there would be no issue of
> > deploying a
> >     new connector on an older version of AK, as the Connect framework
> would
> >     simply not invoke the new method.
> >
> >     Please let me know what you think and if I need to clarify anything.
> >
> >     Thanks,
> >     Aakash
> >
> >
>

Reply via email to